From af676464794070d5b23618b1b1c15d1cafd6085c Mon Sep 17 00:00:00 2001 From: lmangani Date: Mon, 16 Dec 2024 13:14:31 +0000 Subject: [PATCH] named params --- README.md | 94 +++++------------------ src/kafquack_extension.cpp | 152 ++++++++++++++++++++++++------------- 2 files changed, 118 insertions(+), 128 deletions(-) diff --git a/README.md b/README.md index f1124b1..606673a 100644 --- a/README.md +++ b/README.md @@ -1,86 +1,26 @@ -# Kafquack +# DuckDB Kafka Extension -This repository is based on https://github.com/duckdb/extension-template, check it out if you want to build and ship your own DuckDB extension. +This extension allows you to consume Kafka messages through a DuckDB Table function that connects to a Kafka broker and streams messages as a table. ---- +> The extension is a WIP and not functional! Join if you're willing to contribute! -This extension, Kafquack, allow you to ... . +## Examples - -## Building -### Managing dependencies -DuckDB extensions uses VCPKG for dependency management. Enabling VCPKG is very simple: follow the [installation instructions](https://vcpkg.io/en/getting-started) or just run the following: -```shell -git clone https://github.com/Microsoft/vcpkg.git -./vcpkg/bootstrap-vcpkg.sh -export VCPKG_TOOLCHAIN_PATH=`pwd`/vcpkg/scripts/buildsystems/vcpkg.cmake -``` -Note: VCPKG is only required for extensions that want to rely on it for dependency management. If you want to develop an extension without dependencies, or want to do your own dependency management, just skip this step. Note that the example extension uses VCPKG to build with a dependency for instructive purposes, so when skipping this step the build may not work without removing the dependency. - -### Build steps -Now to build the extension, run: -```sh -make -``` -The main binaries that will be built are: -```sh -./build/release/duckdb -./build/release/test/unittest -./build/release/extension/kafquack/kafquack.duckdb_extension -``` -- `duckdb` is the binary for the duckdb shell with the extension code automatically loaded. -- `unittest` is the test runner of duckdb. Again, the extension is already linked into the binary. -- `kafquack.duckdb_extension` is the loadable binary as it would be distributed. - -## Running the extension -To run the extension code, simply start the shell with `./build/release/duckdb`. - -Now we can use the features from the extension directly in DuckDB. The template contains a single scalar function `kafquack()` that takes a string arguments and returns a string: -``` -D select kafquack('Jane') as result; -┌───────────────┐ -│ result │ -│ varchar │ -├───────────────┤ -│ Kafquack Jane 🐥 │ -└───────────────┘ -``` - -## Running the tests -Different tests can be created for DuckDB extensions. The primary way of testing DuckDB extensions should be the SQL tests in `./test/sql`. These SQL tests can be run using: -```sh -make test -``` - -### Installing the deployed binaries -To install your extension binaries from S3, you will need to do two things. Firstly, DuckDB should be launched with the -`allow_unsigned_extensions` option set to true. How to set this will depend on the client you're using. Some examples: - -CLI: -```shell -duckdb -unsigned -``` - -Python: -```python -con = duckdb.connect(':memory:', config={'allow_unsigned_extensions' : 'true'}) -``` - -NodeJS: -```js -db = new duckdb.Database(':memory:', {"allow_unsigned_extensions": "true"}); -``` - -Secondly, you will need to set the repository endpoint in DuckDB to the HTTP url of your bucket + version of the extension -you want to install. To do this run the following SQL query in DuckDB: +#### Basic usage: ```sql -SET custom_extension_repository='bucket.s3.eu-west-1.amazonaws.com//latest'; +SELECT * FROM kafka_consumer('localhost:9092', 'test-topic', 'test-group'); ``` -Note that the `/latest` path will allow you to install the latest extension version available for your current version of -DuckDB. To specify a specific version, you can pass the version instead. -After running these steps, you can install and load your extension using the regular INSTALL/LOAD commands in DuckDB: +#### Secure usage: + ```sql -INSTALL kafquack -LOAD kafquack +SELECT * FROM kafka_consumer( + 'broker.some.cloud:9092', + 'test-topic', + 'test-group', + security_protocol := 'SASL_SSL', + sasl_mechanism := 'PLAIN', + username := 'your-key', + password := 'your-secret' +); ``` diff --git a/src/kafquack_extension.cpp b/src/kafquack_extension.cpp index d3aec44..288ec69 100644 --- a/src/kafquack_extension.cpp +++ b/src/kafquack_extension.cpp @@ -15,27 +15,38 @@ struct KafkaConsumerData : public TableFunctionData { string brokers; string topic; string group_id; + string security_protocol; + string sasl_mechanism; + string username; + string password; unique_ptr consumer; bool running = true; - // Buffer for messages std::mutex buffer_mutex; std::vector message_buffer; static constexpr size_t BUFFER_SIZE = 1000; - KafkaConsumerData(string brokers, string topic, string group_id) - : brokers(brokers), topic(topic), group_id(group_id) { - - // Configure Kafka consumer - cppkafka::Configuration config = { - { "metadata.broker.list", brokers }, - { "group.id", group_id }, - { "enable.auto.commit", false } - }; + unique_ptr Copy() const override { + auto result = make_uniq(); + result->brokers = brokers; + result->topic = topic; + result->group_id = group_id; + result->security_protocol = security_protocol; + result->sasl_mechanism = sasl_mechanism; + result->username = username; + result->password = password; + return result; + } - // Create consumer - consumer = make_uniq(config); - consumer->subscribe({topic}); + bool Equals(const FunctionData &other_p) const override { + auto &other = other_p.Cast(); + return brokers == other.brokers && + topic == other.topic && + group_id == other.group_id && + security_protocol == other.security_protocol && + sasl_mechanism == other.sasl_mechanism && + username == other.username && + password == other.password; } ~KafkaConsumerData() { @@ -55,15 +66,38 @@ struct KafkaConsumerGlobalState : public GlobalTableFunctionState { }; static unique_ptr KafkaConsumerBind(ClientContext &context, - TableFunctionBindInput &input, - vector &return_types, - vector &names) { - - auto brokers = input.inputs[0].GetValue(); - auto topic = input.inputs[1].GetValue(); - auto group_id = input.inputs[2].GetValue(); + TableFunctionBindInput &input, + vector &return_types, + vector &names) { + auto result = make_uniq(); + result->brokers = input.inputs[0].GetValue(); + result->topic = input.inputs[1].GetValue(); + result->group_id = input.inputs[2].GetValue(); + + for (auto &kv : input.named_parameters) { + if (kv.first == "security_protocol") { + result->security_protocol = StringValue::Get(kv.second); + if (result->security_protocol != "SASL_SSL" && result->security_protocol != "SASL_PLAINTEXT") { + throw InvalidInputException("security_protocol must be either SASL_SSL or SASL_PLAINTEXT"); + } + } else if (kv.first == "sasl_mechanism") { + result->sasl_mechanism = StringValue::Get(kv.second); + if (result->sasl_mechanism != "SCRAM-SHA-256" && result->sasl_mechanism != "PLAIN") { + throw InvalidInputException("sasl_mechanism must be either SCRAM-SHA-256 or PLAIN"); + } + } else if (kv.first == "username") { + result->username = StringValue::Get(kv.second); + } else if (kv.first == "password") { + result->password = StringValue::Get(kv.second); + } else { + throw InvalidInputException("Unknown named parameter: %s", kv.first); + } + } + + if (!result->security_protocol.empty() && (result->username.empty() || result->password.empty())) { + throw InvalidInputException("username and password are required when security_protocol is set"); + } - // Define the schema for our Kafka messages names = {"topic", "partition", "offset", "timestamp", "key", "value", "error"}; return_types = { LogicalType::VARCHAR, // topic @@ -75,7 +109,26 @@ static unique_ptr KafkaConsumerBind(ClientContext &context, LogicalType::VARCHAR // error }; - return make_uniq(brokers, topic, group_id); + try { + cppkafka::Configuration config; + config.set("metadata.broker.list", result->brokers); + config.set("group.id", result->group_id); + config.set("enable.auto.commit", false); + + if (!result->security_protocol.empty()) { + config.set("security.protocol", result->security_protocol); + config.set("sasl.mechanisms", result->sasl_mechanism.empty() ? "PLAIN" : result->sasl_mechanism); + config.set("sasl.username", result->username); + config.set("sasl.password", result->password); + } + + result->consumer = make_uniq(config); + result->consumer->subscribe({result->topic}); + } catch (const cppkafka::Exception &e) { + throw InvalidInputException("Failed to create Kafka consumer: %s", e.what()); + } + + return result; } static unique_ptr KafkaConsumerInit(ClientContext &context, @@ -87,53 +140,44 @@ static void KafkaConsumerFunction(ClientContext &context, TableFunctionInput &da DataChunk &output) { auto &data = data_p.bind_data->Cast(); - // Vector to store the messages for this chunk vector messages; - // Try to get messages up to the vector size idx_t chunk_size = 0; while (chunk_size < STANDARD_VECTOR_SIZE && data.running) { - auto msg = data.consumer->poll(std::chrono::milliseconds(100)); - if (!msg) { - continue; + try { + auto msg = data.consumer->poll(std::chrono::milliseconds(100)); + if (!msg) { + continue; + } + messages.push_back(std::move(msg)); + chunk_size++; + } catch (const cppkafka::Exception &e) { + throw InvalidInputException("Error polling Kafka: %s", e.what()); } - - messages.push_back(std::move(msg)); - chunk_size++; } if (chunk_size == 0) { - // No messages available output.SetCardinality(0); return; } - // Set up the output vectors output.SetCardinality(chunk_size); for (idx_t i = 0; i < chunk_size; i++) { const auto &msg = messages[i]; - // topic output.data[0].SetValue(i, Value(msg.get_topic())); - - // partition output.data[1].SetValue(i, Value::INTEGER(msg.get_partition())); - - // offset output.data[2].SetValue(i, Value::BIGINT(msg.get_offset())); - // timestamp auto ts = msg.get_timestamp(); if (ts) { - // Get milliseconds since epoch and convert to timestamp auto ms = std::chrono::duration_cast(ts.get().get_timestamp()); output.data[3].SetValue(i, Value::TIMESTAMP(Timestamp::FromEpochMs(ms.count()))); } else { FlatVector::SetNull(output.data[3], i, true); } - // key if (msg.get_key()) { string key_str(reinterpret_cast(msg.get_key().get_data()), msg.get_key().get_size()); @@ -142,7 +186,6 @@ static void KafkaConsumerFunction(ClientContext &context, TableFunctionInput &da FlatVector::SetNull(output.data[4], i, true); } - // value and error if (!msg.get_error()) { string value_str(reinterpret_cast(msg.get_payload().get_data()), msg.get_payload().get_size()); @@ -153,25 +196,33 @@ static void KafkaConsumerFunction(ClientContext &context, TableFunctionInput &da output.data[6].SetValue(i, Value(msg.get_error().to_string())); } - // Commit the message - data.consumer->commit(msg); + try { + data.consumer->commit(msg); + } catch (const cppkafka::Exception &e) { + throw InvalidInputException("Error committing message: %s", e.what()); + } } } -class KafquackExtension : public Extension { // Changed from KafkaExtension to KafquackExtension +class KafquackExtension : public Extension { public: void Load(DuckDB &db) override { - // Register the Kafka consumer function vector args = { LogicalType::VARCHAR, // brokers LogicalType::VARCHAR, // topic LogicalType::VARCHAR // group_id }; + named_parameter_type_map_t named_params = { + {"security_protocol", LogicalType::VARCHAR}, + {"sasl_mechanism", LogicalType::VARCHAR}, + {"username", LogicalType::VARCHAR}, + {"password", LogicalType::VARCHAR} + }; + TableFunction kafka_consumer("kafka_consumer", args, KafkaConsumerFunction, KafkaConsumerBind, KafkaConsumerInit); - - // Set parallel properties + kafka_consumer.named_parameters = named_params; kafka_consumer.projection_pushdown = false; kafka_consumer.filter_pushdown = false; @@ -179,10 +230,9 @@ class KafquackExtension : public Extension { // Changed from KafkaExtension to } std::string Name() override { - return "kafquack"; // Changed from "kafka" to "kafquack" + return "kafquack"; } - // Add Version method implementation std::string Version() const override { #ifdef KAFQUACK_VERSION return KAFQUACK_VERSION; @@ -195,12 +245,12 @@ class KafquackExtension : public Extension { // Changed from KafkaExtension to } // namespace duckdb extern "C" { -DUCKDB_EXTENSION_API void kafquack_init(duckdb::DatabaseInstance &db) { // Changed from kafka_init +DUCKDB_EXTENSION_API void kafquack_init(duckdb::DatabaseInstance &db) { duckdb::DuckDB db_wrapper(db); db_wrapper.LoadExtension(); } -DUCKDB_EXTENSION_API const char *kafquack_version() { // Changed from kafka_version +DUCKDB_EXTENSION_API const char *kafquack_version() { return duckdb::DuckDB::LibraryVersion(); } }