Skip to content

Commit bc9f4b3

Browse files
authored
Merge pull request #113 from pdet/view
Reuse connection to Extract,Consume and execute substrait query plans
2 parents be71387 + 3ac8aae commit bc9f4b3

13 files changed

+325
-159
lines changed

.github/workflows/distribution.yml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,19 @@ concurrency:
2424
jobs:
2525
duckdb-stable-build:
2626
name: Build extension binaries
27-
uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@v1.1.0
27+
uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@main
2828
with:
29-
duckdb_version: v1.1.0
30-
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_rtools"
29+
duckdb_version: main
30+
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_mingw"
3131
extension_name: substrait
3232

3333
duckdb-stable-deploy:
3434
name: Deploy extension binaries
3535
needs: duckdb-stable-build
36-
uses: duckdb/extension-ci-tools/.github/workflows/_extension_deploy.yml@v1.1.0
36+
uses: duckdb/extension-ci-tools/.github/workflows/_extension_deploy.yml@main
3737
secrets: inherit
3838
with:
39-
duckdb_version: v1.1.0
40-
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_rtools"
39+
duckdb_version: main
40+
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_mingw"
4141
extension_name: substrait
4242
deploy_latest: true

.github/workflows/main_distribution.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,6 @@ jobs:
2323
with:
2424
duckdb_version: main
2525
ci_tools_version: main
26-
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_rtools"
26+
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_mingw"
2727
extension_name: substrait
2828

duckdb

Submodule duckdb updated 1863 files

src/from_substrait.cpp

Lines changed: 72 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,6 @@
22

33
#include "duckdb/common/types/value.hpp"
44
#include "duckdb/parser/expression/list.hpp"
5-
#include "duckdb/main/relation/join_relation.hpp"
6-
#include "duckdb/main/relation/cross_product_relation.hpp"
7-
8-
#include "duckdb/main/relation/limit_relation.hpp"
9-
#include "duckdb/main/relation/projection_relation.hpp"
10-
#include "duckdb/main/relation/setop_relation.hpp"
11-
#include "duckdb/main/relation/aggregate_relation.hpp"
12-
#include "duckdb/main/relation/filter_relation.hpp"
13-
#include "duckdb/main/relation/order_relation.hpp"
145
#include "duckdb/main/connection.hpp"
156
#include "duckdb/parser/parser.hpp"
167
#include "duckdb/common/exception.hpp"
@@ -25,7 +16,24 @@
2516
#include "google/protobuf/util/json_util.h"
2617
#include "substrait/plan.pb.h"
2718

19+
#include "duckdb/main/table_description.hpp"
20+
21+
#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
22+
#include "duckdb/common/helper.hpp"
23+
24+
#include "duckdb/main/relation.hpp"
2825
#include "duckdb/main/relation/table_relation.hpp"
26+
#include "duckdb/main/relation/table_function_relation.hpp"
27+
#include "duckdb/main/relation/value_relation.hpp"
28+
#include "duckdb/main/relation/view_relation.hpp"
29+
#include "duckdb/main/relation/aggregate_relation.hpp"
30+
#include "duckdb/main/relation/cross_product_relation.hpp"
31+
#include "duckdb/main/relation/filter_relation.hpp"
32+
#include "duckdb/main/relation/join_relation.hpp"
33+
#include "duckdb/main/relation/limit_relation.hpp"
34+
#include "duckdb/main/relation/order_relation.hpp"
35+
#include "duckdb/main/relation/projection_relation.hpp"
36+
#include "duckdb/main/relation/setop_relation.hpp"
2937

3038
namespace duckdb {
3139
const std::unordered_map<std::string, std::string> SubstraitToDuckDB::function_names_remap = {
@@ -40,7 +48,7 @@ const case_insensitive_set_t SubstraitToDuckDB::valid_extract_subfields = {
4048
"quarter", "microsecond", "milliseconds", "second", "minute", "hour"};
4149

4250
string SubstraitToDuckDB::RemapFunctionName(const string &function_name) {
43-
// Lets first drop any extension id
51+
// Let's first drop any extension id
4452
string name;
4553
for (auto &c : function_name) {
4654
if (c == ':') {
@@ -67,7 +75,9 @@ string SubstraitToDuckDB::RemoveExtension(const string &function_name) {
6775
return name;
6876
}
6977

70-
SubstraitToDuckDB::SubstraitToDuckDB(Connection &con_p, const string &serialized, bool json) : con(con_p) {
78+
SubstraitToDuckDB::SubstraitToDuckDB(shared_ptr<ClientContext> &context_p, const string &serialized, bool json,
79+
bool acquire_lock_p)
80+
: context(context_p), acquire_lock(acquire_lock_p) {
7181
if (!json) {
7282
if (!plan.ParseFromString(serialized)) {
7383
throw std::runtime_error("Was not possible to convert binary into Substrait plan");
@@ -510,16 +520,46 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformAggregateOp(const substrait::Re
510520
return make_shared_ptr<AggregateRelation>(TransformOp(sop.aggregate().input()), std::move(expressions),
511521
std::move(groups));
512522
}
523+
unique_ptr<TableDescription> TableInfo(ClientContext &context, const string &schema_name, const string &table_name) {
524+
// obtain the table info
525+
auto table = Catalog::GetEntry<TableCatalogEntry>(context, INVALID_CATALOG, schema_name, table_name,
526+
OnEntryNotFound::RETURN_NULL);
527+
if (!table) {
528+
return {};
529+
}
530+
// write the table info to the result
531+
auto result = make_uniq<TableDescription>(INVALID_CATALOG, schema_name, table_name);
532+
for (auto &column : table->GetColumns().Logical()) {
533+
result->columns.emplace_back(column.Copy());
534+
}
535+
return result;
536+
}
513537

514538
shared_ptr<Relation> SubstraitToDuckDB::TransformReadOp(const substrait::Rel &sop) {
515539
auto &sget = sop.read();
516540
shared_ptr<Relation> scan;
541+
auto context_wrapper = make_shared_ptr<RelationContextWrapper>(context);
517542
if (sget.has_named_table()) {
543+
auto table_name = sget.named_table().names(0);
518544
// If we can't find a table with that name, let's try a view.
519545
try {
520-
scan = con.Table(sget.named_table().names(0));
546+
auto table_info = TableInfo(*context, DEFAULT_SCHEMA, table_name);
547+
if (!table_info) {
548+
throw CatalogException("Table '%s' does not exist!", table_name);
549+
}
550+
if (acquire_lock) {
551+
scan = make_shared_ptr<TableRelation>(context, std::move(table_info));
552+
553+
} else {
554+
scan = make_shared_ptr<TableRelation>(context_wrapper, std::move(table_info));
555+
}
521556
} catch (...) {
522-
scan = con.View(sget.named_table().names(0));
557+
if (acquire_lock) {
558+
scan = make_shared_ptr<ViewRelation>(context, DEFAULT_SCHEMA, table_name);
559+
560+
} else {
561+
scan = make_shared_ptr<ViewRelation>(context_wrapper, DEFAULT_SCHEMA, table_name);
562+
}
523563
}
524564
} else if (sget.has_local_files()) {
525565
vector<Value> parquet_files;
@@ -540,7 +580,18 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformReadOp(const substrait::Rel &so
540580
}
541581
string name = "parquet_" + StringUtil::GenerateRandomName();
542582
named_parameter_map_t named_parameters({{"binary_as_string", Value::BOOLEAN(false)}});
543-
scan = con.TableFunction("parquet_scan", {Value::LIST(parquet_files)}, named_parameters)->Alias(name);
583+
vector<Value> parameters {Value::LIST(parquet_files)};
584+
shared_ptr<TableFunctionRelation> scan_rel;
585+
if (acquire_lock) {
586+
scan_rel = make_shared_ptr<TableFunctionRelation>(context, "parquet_scan", parameters,
587+
std::move(named_parameters));
588+
} else {
589+
scan_rel = make_shared_ptr<TableFunctionRelation>(context_wrapper, "parquet_scan", parameters,
590+
std::move(named_parameters));
591+
}
592+
593+
auto rel = static_cast<Relation *>(scan_rel.get());
594+
scan = rel->Alias(name);
544595
} else if (sget.has_virtual_table()) {
545596
// We need to handle a virtual table as a LogicalExpressionGet
546597
auto literal_values = sget.virtual_table().values();
@@ -553,7 +604,13 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformReadOp(const substrait::Rel &so
553604
}
554605
expression_rows.emplace_back(expression_row);
555606
}
556-
scan = con.Values(expression_rows);
607+
vector<string> column_names;
608+
if (acquire_lock) {
609+
scan = make_shared_ptr<ValueRelation>(context, expression_rows, column_names);
610+
611+
} else {
612+
scan = make_shared_ptr<ValueRelation>(context_wrapper, expression_rows, column_names);
613+
}
557614
} else {
558615
throw NotImplementedException("Unsupported type of read operator for substrait");
559616
}

src/include/from_substrait.hpp

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
//===----------------------------------------------------------------------===//
2+
// DuckDB
3+
//
4+
// from_substrait.hpp
5+
//
6+
//
7+
//===----------------------------------------------------------------------===//
8+
19
#pragma once
210

311
#include <string>
@@ -10,7 +18,8 @@ namespace duckdb {
1018

1119
class SubstraitToDuckDB {
1220
public:
13-
SubstraitToDuckDB(Connection &con_p, const string &serialized, bool json = false);
21+
SubstraitToDuckDB(shared_ptr<ClientContext> &context_p, const string &serialized, bool json = false,
22+
bool acquire_lock = false);
1423
//! Transforms Substrait Plan to DuckDB Relation
1524
shared_ptr<Relation> TransformPlan();
1625

@@ -48,8 +57,8 @@ class SubstraitToDuckDB {
4857

4958
//! Transform Substrait Sort Order to DuckDB Order
5059
OrderByNode TransformOrder(const substrait::SortField &sordf);
51-
//! DuckDB Connection
52-
Connection &con;
60+
//! DuckDB Client Context
61+
shared_ptr<ClientContext> context;
5362
//! Substrait Plan
5463
substrait::Plan plan;
5564
//! Variable used to register functions
@@ -59,5 +68,7 @@ class SubstraitToDuckDB {
5968
static const unordered_map<std::string, std::string> function_names_remap;
6069
static const case_insensitive_set_t valid_extract_subfields;
6170
vector<ParsedExpression *> struct_expressions;
71+
//! If we should acquire a client context lock when creating the relatiosn
72+
const bool acquire_lock;
6273
};
6374
} // namespace duckdb

src/include/to_substrait.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
//===----------------------------------------------------------------------===//
2+
// DuckDB
3+
//
4+
// to_substrait.hpp
5+
//
6+
//
7+
//===----------------------------------------------------------------------===//
8+
19
#pragma once
210

311
#include "custom_extensions/custom_extensions.hpp"

0 commit comments

Comments
 (0)