Skip to content

Commit 61bfd1f

Browse files
committed
Lockless plan consumption
1 parent 846c491 commit 61bfd1f

File tree

4 files changed

+127
-260
lines changed

4 files changed

+127
-260
lines changed

src/from_substrait.cpp

Lines changed: 45 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,24 @@
1616
#include "google/protobuf/util/json_util.h"
1717
#include "substrait/plan.pb.h"
1818

19-
#include "substrait_relations.hpp"
20-
#include "duckdb/common/helper.hpp"
2119
#include "duckdb/main/table_description.hpp"
20+
2221
#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
22+
#include "duckdb/common/helper.hpp"
23+
24+
#include "duckdb/main/relation.hpp"
25+
#include "duckdb/main/relation/table_relation.hpp"
26+
#include "duckdb/main/relation/table_function_relation.hpp"
27+
#include "duckdb/main/relation/value_relation.hpp"
28+
#include "duckdb/main/relation/view_relation.hpp"
29+
#include "duckdb/main/relation/aggregate_relation.hpp"
30+
#include "duckdb/main/relation/cross_product_relation.hpp"
31+
#include "duckdb/main/relation/filter_relation.hpp"
32+
#include "duckdb/main/relation/join_relation.hpp"
33+
#include "duckdb/main/relation/limit_relation.hpp"
34+
#include "duckdb/main/relation/order_relation.hpp"
35+
#include "duckdb/main/relation/projection_relation.hpp"
36+
#include "duckdb/main/relation/setop_relation.hpp"
2337

2438
namespace duckdb {
2539
const std::unordered_map<std::string, std::string> SubstraitToDuckDB::function_names_remap = {
@@ -61,8 +75,8 @@ string SubstraitToDuckDB::RemoveExtension(const string &function_name) {
6175
return name;
6276
}
6377

64-
65-
SubstraitToDuckDB::SubstraitToDuckDB(shared_ptr<ClientContext> &context_p, const string &serialized, bool json):context(context_p) {
78+
SubstraitToDuckDB::SubstraitToDuckDB(shared_ptr<ClientContext> &context_p, const string &serialized, bool json)
79+
: context(context_p) {
6680
if (!json) {
6781
if (!plan.ParseFromString(serialized)) {
6882
throw std::runtime_error("Was not possible to convert binary into Substrait plan");
@@ -437,28 +451,28 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformJoinOp(const substrait::Rel &so
437451
throw InternalException("Unsupported join type");
438452
}
439453
unique_ptr<ParsedExpression> join_condition = TransformExpr(sjoin.expression());
440-
return make_shared_ptr<SubstraitJoinRelation>(TransformOp(sjoin.left())->Alias("left"),
454+
return make_shared_ptr<JoinRelation>(TransformOp(sjoin.left())->Alias("left"),
441455
TransformOp(sjoin.right())->Alias("right"), std::move(join_condition),
442456
djointype);
443457
}
444458

445459
shared_ptr<Relation> SubstraitToDuckDB::TransformCrossProductOp(const substrait::Rel &sop) {
446460
auto &sub_cross = sop.cross();
447461

448-
return make_shared_ptr<SubstraitCrossProductRelation>(TransformOp(sub_cross.left())->Alias("left"),
462+
return make_shared_ptr<CrossProductRelation>(TransformOp(sub_cross.left())->Alias("left"),
449463
TransformOp(sub_cross.right())->Alias("right"));
450464
}
451465

452466
shared_ptr<Relation> SubstraitToDuckDB::TransformFetchOp(const substrait::Rel &sop) {
453467
auto &slimit = sop.fetch();
454468
idx_t limit = slimit.count() == -1 ? NumericLimits<idx_t>::Maximum() : slimit.count();
455469
idx_t offset = slimit.offset();
456-
return make_shared_ptr<SubstraitLimitRelation>(TransformOp(slimit.input()), limit, offset);
470+
return make_shared_ptr<LimitRelation>(TransformOp(slimit.input()), limit, offset);
457471
}
458472

459473
shared_ptr<Relation> SubstraitToDuckDB::TransformFilterOp(const substrait::Rel &sop) {
460474
auto &sfilter = sop.filter();
461-
return make_shared_ptr<SubstraitFilterRelation>(TransformOp(sfilter.input()), TransformExpr(sfilter.condition()));
475+
return make_shared_ptr<FilterRelation>(TransformOp(sfilter.input()), TransformExpr(sfilter.condition()));
462476
}
463477

464478
shared_ptr<Relation> SubstraitToDuckDB::TransformProjectOp(const substrait::Rel &sop) {
@@ -471,7 +485,7 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformProjectOp(const substrait::Rel
471485
for (size_t i = 0; i < expressions.size(); i++) {
472486
mock_aliases.push_back("expr_" + to_string(i));
473487
}
474-
return make_shared_ptr<SubstraitProjectionRelation>(TransformOp(sop.project().input()), std::move(expressions),
488+
return make_shared_ptr<ProjectionRelation>(TransformOp(sop.project().input()), std::move(expressions),
475489
std::move(mock_aliases));
476490
}
477491

@@ -503,16 +517,16 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformAggregateOp(const substrait::Re
503517
nullptr, nullptr, is_distinct));
504518
}
505519

506-
return make_shared_ptr<SubstraitAggregateRelation>(TransformOp(sop.aggregate().input()), std::move(expressions),
520+
return make_shared_ptr<AggregateRelation>(TransformOp(sop.aggregate().input()), std::move(expressions),
507521
std::move(groups));
508522
}
509-
unique_ptr<TableDescription> TableInfo(ClientContext& context, const string &schema_name, const string &table_name) {
523+
unique_ptr<TableDescription> TableInfo(ClientContext &context, const string &schema_name, const string &table_name) {
510524
unique_ptr<TableDescription> result;
511525
// obtain the table info
512526
auto table = Catalog::GetEntry<TableCatalogEntry>(context, INVALID_CATALOG, schema_name, table_name,
513-
OnEntryNotFound::RETURN_NULL);
527+
OnEntryNotFound::RETURN_NULL);
514528
if (!table) {
515-
return{};
529+
return {};
516530
}
517531
// write the table info to the result
518532
result = make_uniq<TableDescription>();
@@ -531,13 +545,13 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformReadOp(const substrait::Rel &so
531545
auto table_name = sget.named_table().names(0);
532546
// If we can't find a table with that name, let's try a view.
533547
try {
534-
auto table_info =TableInfo(*context, DEFAULT_SCHEMA, table_name);
548+
auto table_info = TableInfo(*context, DEFAULT_SCHEMA, table_name);
535549
if (!table_info) {
536550
throw CatalogException("Table '%s' does not exist!", table_name);
537551
}
538-
return make_shared_ptr<SubstraitTableRelation>(context, std::move(table_info));
552+
return make_shared_ptr<TableRelation>(context, std::move(table_info), false);
539553
} catch (...) {
540-
scan = make_shared_ptr<SubstraitViewRelation>(context, DEFAULT_SCHEMA, table_name);
554+
scan = make_shared_ptr<ViewRelation>(context, DEFAULT_SCHEMA, table_name, false);
541555
}
542556
} else if (sget.has_local_files()) {
543557
vector<Value> parquet_files;
@@ -558,9 +572,11 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformReadOp(const substrait::Rel &so
558572
}
559573
string name = "parquet_" + StringUtil::GenerateRandomName();
560574
named_parameter_map_t named_parameters({{"binary_as_string", Value::BOOLEAN(false)}});
561-
// auto scan_rel = make_shared_ptr<SubstraitTableFunctionRelation>(context, "parquet_scan", {Value::LIST(parquet_files)}, named_parameters);
562-
// auto rel = static_cast<Relation*>(scan_rel.get());
563-
// scan = rel->Alias(name);
575+
vector<Value> parameters {Value::LIST(parquet_files)};
576+
auto scan_rel = make_shared_ptr<TableFunctionRelation>(context, "parquet_scan", parameters,
577+
std::move(named_parameters), nullptr, true, false);
578+
auto rel = static_cast<Relation *>(scan_rel.get());
579+
scan = rel->Alias(name);
564580
} else if (sget.has_virtual_table()) {
565581
// We need to handle a virtual table as a LogicalExpressionGet
566582
auto literal_values = sget.virtual_table().values();
@@ -574,13 +590,13 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformReadOp(const substrait::Rel &so
574590
expression_rows.emplace_back(expression_row);
575591
}
576592
vector<string> column_names;
577-
scan = make_shared_ptr<SubstraitValueRelation>(context, expression_rows, column_names, "values");
593+
scan = make_shared_ptr<ValueRelation>(context, expression_rows, column_names, "values", false);
578594
} else {
579595
throw NotImplementedException("Unsupported type of read operator for substrait");
580596
}
581597

582598
if (sget.has_filter()) {
583-
scan = make_shared_ptr<SubstraitFilterRelation>(std::move(scan), TransformExpr(sget.filter()));
599+
scan = make_shared_ptr<FilterRelation>(std::move(scan), TransformExpr(sget.filter()));
584600
}
585601

586602
if (sget.has_projection()) {
@@ -593,7 +609,7 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformReadOp(const substrait::Rel &so
593609
// TODO make sure nothing else is in there
594610
expressions.push_back(make_uniq<PositionalReferenceExpression>(sproj.field() + 1));
595611
}
596-
scan = make_shared_ptr<SubstraitProjectionRelation>(std::move(scan), std::move(expressions), std::move(aliases));
612+
scan = make_shared_ptr<ProjectionRelation>(std::move(scan), std::move(expressions), std::move(aliases));
597613
}
598614

599615
return scan;
@@ -604,7 +620,7 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformSortOp(const substrait::Rel &so
604620
for (auto &sordf : sop.sort().sorts()) {
605621
order_nodes.push_back(TransformOrder(sordf));
606622
}
607-
return make_shared_ptr<SubstraitOrderRelation>(TransformOp(sop.sort().input()), std::move(order_nodes));
623+
return make_shared_ptr<OrderRelation>(TransformOp(sop.sort().input()), std::move(order_nodes));
608624
}
609625

610626
static SetOperationType TransformSetOperationType(substrait::SetRel_SetOp setop) {
@@ -638,7 +654,7 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformSetOp(const substrait::Rel &sop
638654
auto lhs = TransformOp(inputs[0]);
639655
auto rhs = TransformOp(inputs[1]);
640656

641-
return make_shared_ptr<SubstraitSetOpRelation>(std::move(lhs), std::move(rhs), type);
657+
return make_shared_ptr<SetOpRelation>(std::move(lhs), std::move(rhs), type);
642658
}
643659

644660
shared_ptr<Relation> SubstraitToDuckDB::TransformOp(const substrait::Rel &sop) {
@@ -687,11 +703,11 @@ Relation *GetProjection(Relation &relation) {
687703
case RelationType::PROJECTION_RELATION:
688704
return &relation;
689705
case RelationType::LIMIT_RELATION:
690-
return GetProjection(*relation.Cast<SubstraitLimitRelation>().child);
706+
return GetProjection(*relation.Cast<LimitRelation>().child);
691707
case RelationType::ORDER_RELATION:
692-
return GetProjection(*relation.Cast<SubstraitOrderRelation>().child);
708+
return GetProjection(*relation.Cast<OrderRelation>().child);
693709
case RelationType::SET_OPERATION_RELATION:
694-
return GetProjection(*relation.Cast<SubstraitSetOpRelation>().right);
710+
return GetProjection(*relation.Cast<SetOpRelation>().right);
695711
default:
696712
return nullptr;
697713
}
@@ -705,7 +721,7 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformRootOp(const substrait::RelRoot
705721
auto child = TransformOp(sop.input());
706722
auto first_projection_or_table = GetProjection(*child);
707723
if (first_projection_or_table) {
708-
vector<ColumnDefinition> *column_definitions = &first_projection_or_table->Cast<SubstraitProjectionRelation>().columns;
724+
vector<ColumnDefinition> *column_definitions = &first_projection_or_table->Cast<ProjectionRelation>().columns;
709725
int32_t i = 0;
710726
for (auto &column : *column_definitions) {
711727
aliases.push_back(column_names[i++]);
@@ -720,7 +736,7 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformRootOp(const substrait::RelRoot
720736
}
721737
}
722738

723-
return make_shared_ptr<SubstraitProjectionRelation>(child, std::move(expressions), aliases);
739+
return make_shared_ptr<ProjectionRelation>(child, std::move(expressions), aliases);
724740
}
725741

726742
shared_ptr<Relation> SubstraitToDuckDB::TransformPlan() {

src/include/substrait_relations.hpp

Lines changed: 0 additions & 118 deletions
This file was deleted.

src/include/to_substrait.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
//
77
//===----------------------------------------------------------------------===//
88

9-
109
#pragma once
1110

1211
#include "custom_extensions/custom_extensions.hpp"

0 commit comments

Comments
 (0)