Skip to content

Named Parameters #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 17 additions & 77 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,86 +1,26 @@
# Kafquack
# DuckDB Kafka Extension

This repository is based on https://github.com/duckdb/extension-template, check it out if you want to build and ship your own DuckDB extension.
This extension allows you to consume Kafka messages through a DuckDB Table function that connects to a Kafka broker and streams messages as a table.

---
> The extension is a WIP and not functional! Join if you're willing to contribute!

This extension, Kafquack, allow you to ... <extension_goal>.
## Examples


## Building
### Managing dependencies
DuckDB extensions uses VCPKG for dependency management. Enabling VCPKG is very simple: follow the [installation instructions](https://vcpkg.io/en/getting-started) or just run the following:
```shell
git clone https://github.com/Microsoft/vcpkg.git
./vcpkg/bootstrap-vcpkg.sh
export VCPKG_TOOLCHAIN_PATH=`pwd`/vcpkg/scripts/buildsystems/vcpkg.cmake
```
Note: VCPKG is only required for extensions that want to rely on it for dependency management. If you want to develop an extension without dependencies, or want to do your own dependency management, just skip this step. Note that the example extension uses VCPKG to build with a dependency for instructive purposes, so when skipping this step the build may not work without removing the dependency.

### Build steps
Now to build the extension, run:
```sh
make
```
The main binaries that will be built are:
```sh
./build/release/duckdb
./build/release/test/unittest
./build/release/extension/kafquack/kafquack.duckdb_extension
```
- `duckdb` is the binary for the duckdb shell with the extension code automatically loaded.
- `unittest` is the test runner of duckdb. Again, the extension is already linked into the binary.
- `kafquack.duckdb_extension` is the loadable binary as it would be distributed.

## Running the extension
To run the extension code, simply start the shell with `./build/release/duckdb`.

Now we can use the features from the extension directly in DuckDB. The template contains a single scalar function `kafquack()` that takes a string arguments and returns a string:
```
D select kafquack('Jane') as result;
┌───────────────┐
│ result │
│ varchar │
├───────────────┤
│ Kafquack Jane 🐥 │
└───────────────┘
```

## Running the tests
Different tests can be created for DuckDB extensions. The primary way of testing DuckDB extensions should be the SQL tests in `./test/sql`. These SQL tests can be run using:
```sh
make test
```

### Installing the deployed binaries
To install your extension binaries from S3, you will need to do two things. Firstly, DuckDB should be launched with the
`allow_unsigned_extensions` option set to true. How to set this will depend on the client you're using. Some examples:

CLI:
```shell
duckdb -unsigned
```

Python:
```python
con = duckdb.connect(':memory:', config={'allow_unsigned_extensions' : 'true'})
```

NodeJS:
```js
db = new duckdb.Database(':memory:', {"allow_unsigned_extensions": "true"});
```

Secondly, you will need to set the repository endpoint in DuckDB to the HTTP url of your bucket + version of the extension
you want to install. To do this run the following SQL query in DuckDB:
#### Basic usage:
```sql
SET custom_extension_repository='bucket.s3.eu-west-1.amazonaws.com/<your_extension_name>/latest';
SELECT * FROM kafka_consumer('localhost:9092', 'test-topic', 'test-group');
```
Note that the `/latest` path will allow you to install the latest extension version available for your current version of
DuckDB. To specify a specific version, you can pass the version instead.

After running these steps, you can install and load your extension using the regular INSTALL/LOAD commands in DuckDB:
#### Secure usage:

```sql
INSTALL kafquack
LOAD kafquack
SELECT * FROM kafka_consumer(
'broker.some.cloud:9092',
'test-topic',
'test-group',
security_protocol := 'SASL_SSL',
sasl_mechanism := 'PLAIN',
username := 'your-key',
password := 'your-secret'
);
```
152 changes: 101 additions & 51 deletions src/kafquack_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,38 @@ struct KafkaConsumerData : public TableFunctionData {
string brokers;
string topic;
string group_id;
string security_protocol;
string sasl_mechanism;
string username;
string password;
unique_ptr<cppkafka::Consumer> consumer;
bool running = true;

// Buffer for messages
std::mutex buffer_mutex;
std::vector<cppkafka::Message> message_buffer;
static constexpr size_t BUFFER_SIZE = 1000;

KafkaConsumerData(string brokers, string topic, string group_id)
: brokers(brokers), topic(topic), group_id(group_id) {

// Configure Kafka consumer
cppkafka::Configuration config = {
{ "metadata.broker.list", brokers },
{ "group.id", group_id },
{ "enable.auto.commit", false }
};
unique_ptr<FunctionData> Copy() const override {
auto result = make_uniq<KafkaConsumerData>();
result->brokers = brokers;
result->topic = topic;
result->group_id = group_id;
result->security_protocol = security_protocol;
result->sasl_mechanism = sasl_mechanism;
result->username = username;
result->password = password;
return result;
}

// Create consumer
consumer = make_uniq<cppkafka::Consumer>(config);
consumer->subscribe({topic});
bool Equals(const FunctionData &other_p) const override {
auto &other = other_p.Cast<KafkaConsumerData>();
return brokers == other.brokers &&
topic == other.topic &&
group_id == other.group_id &&
security_protocol == other.security_protocol &&
sasl_mechanism == other.sasl_mechanism &&
username == other.username &&
password == other.password;
}

~KafkaConsumerData() {
Expand All @@ -55,15 +66,38 @@ struct KafkaConsumerGlobalState : public GlobalTableFunctionState {
};

static unique_ptr<FunctionData> KafkaConsumerBind(ClientContext &context,
TableFunctionBindInput &input,
vector<LogicalType> &return_types,
vector<string> &names) {

auto brokers = input.inputs[0].GetValue<string>();
auto topic = input.inputs[1].GetValue<string>();
auto group_id = input.inputs[2].GetValue<string>();
TableFunctionBindInput &input,
vector<LogicalType> &return_types,
vector<string> &names) {
auto result = make_uniq<KafkaConsumerData>();
result->brokers = input.inputs[0].GetValue<string>();
result->topic = input.inputs[1].GetValue<string>();
result->group_id = input.inputs[2].GetValue<string>();

for (auto &kv : input.named_parameters) {
if (kv.first == "security_protocol") {
result->security_protocol = StringValue::Get(kv.second);
if (result->security_protocol != "SASL_SSL" && result->security_protocol != "SASL_PLAINTEXT") {
throw InvalidInputException("security_protocol must be either SASL_SSL or SASL_PLAINTEXT");
}
} else if (kv.first == "sasl_mechanism") {
result->sasl_mechanism = StringValue::Get(kv.second);
if (result->sasl_mechanism != "SCRAM-SHA-256" && result->sasl_mechanism != "PLAIN") {
throw InvalidInputException("sasl_mechanism must be either SCRAM-SHA-256 or PLAIN");
}
} else if (kv.first == "username") {
result->username = StringValue::Get(kv.second);
} else if (kv.first == "password") {
result->password = StringValue::Get(kv.second);
} else {
throw InvalidInputException("Unknown named parameter: %s", kv.first);
}
}

if (!result->security_protocol.empty() && (result->username.empty() || result->password.empty())) {
throw InvalidInputException("username and password are required when security_protocol is set");
}

// Define the schema for our Kafka messages
names = {"topic", "partition", "offset", "timestamp", "key", "value", "error"};
return_types = {
LogicalType::VARCHAR, // topic
Expand All @@ -75,7 +109,26 @@ static unique_ptr<FunctionData> KafkaConsumerBind(ClientContext &context,
LogicalType::VARCHAR // error
};

return make_uniq<KafkaConsumerData>(brokers, topic, group_id);
try {
cppkafka::Configuration config;
config.set("metadata.broker.list", result->brokers);
config.set("group.id", result->group_id);
config.set("enable.auto.commit", false);

if (!result->security_protocol.empty()) {
config.set("security.protocol", result->security_protocol);
config.set("sasl.mechanisms", result->sasl_mechanism.empty() ? "PLAIN" : result->sasl_mechanism);
config.set("sasl.username", result->username);
config.set("sasl.password", result->password);
}

result->consumer = make_uniq<cppkafka::Consumer>(config);
result->consumer->subscribe({result->topic});
} catch (const cppkafka::Exception &e) {
throw InvalidInputException("Failed to create Kafka consumer: %s", e.what());
}

return result;
}

static unique_ptr<GlobalTableFunctionState> KafkaConsumerInit(ClientContext &context,
Expand All @@ -87,53 +140,44 @@ static void KafkaConsumerFunction(ClientContext &context, TableFunctionInput &da
DataChunk &output) {
auto &data = data_p.bind_data->Cast<KafkaConsumerData>();

// Vector to store the messages for this chunk
vector<cppkafka::Message> messages;

// Try to get messages up to the vector size
idx_t chunk_size = 0;
while (chunk_size < STANDARD_VECTOR_SIZE && data.running) {
auto msg = data.consumer->poll(std::chrono::milliseconds(100));
if (!msg) {
continue;
try {
auto msg = data.consumer->poll(std::chrono::milliseconds(100));
if (!msg) {
continue;
}
messages.push_back(std::move(msg));
chunk_size++;
} catch (const cppkafka::Exception &e) {
throw InvalidInputException("Error polling Kafka: %s", e.what());
}

messages.push_back(std::move(msg));
chunk_size++;
}

if (chunk_size == 0) {
// No messages available
output.SetCardinality(0);
return;
}

// Set up the output vectors
output.SetCardinality(chunk_size);

for (idx_t i = 0; i < chunk_size; i++) {
const auto &msg = messages[i];

// topic
output.data[0].SetValue(i, Value(msg.get_topic()));

// partition
output.data[1].SetValue(i, Value::INTEGER(msg.get_partition()));

// offset
output.data[2].SetValue(i, Value::BIGINT(msg.get_offset()));

// timestamp
auto ts = msg.get_timestamp();
if (ts) {
// Get milliseconds since epoch and convert to timestamp
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(ts.get().get_timestamp());
output.data[3].SetValue(i, Value::TIMESTAMP(Timestamp::FromEpochMs(ms.count())));
} else {
FlatVector::SetNull(output.data[3], i, true);
}

// key
if (msg.get_key()) {
string key_str(reinterpret_cast<const char*>(msg.get_key().get_data()),
msg.get_key().get_size());
Expand All @@ -142,7 +186,6 @@ static void KafkaConsumerFunction(ClientContext &context, TableFunctionInput &da
FlatVector::SetNull(output.data[4], i, true);
}

// value and error
if (!msg.get_error()) {
string value_str(reinterpret_cast<const char*>(msg.get_payload().get_data()),
msg.get_payload().get_size());
Expand All @@ -153,36 +196,43 @@ static void KafkaConsumerFunction(ClientContext &context, TableFunctionInput &da
output.data[6].SetValue(i, Value(msg.get_error().to_string()));
}

// Commit the message
data.consumer->commit(msg);
try {
data.consumer->commit(msg);
} catch (const cppkafka::Exception &e) {
throw InvalidInputException("Error committing message: %s", e.what());
}
}
}

class KafquackExtension : public Extension { // Changed from KafkaExtension to KafquackExtension
class KafquackExtension : public Extension {
public:
void Load(DuckDB &db) override {
// Register the Kafka consumer function
vector<LogicalType> args = {
LogicalType::VARCHAR, // brokers
LogicalType::VARCHAR, // topic
LogicalType::VARCHAR // group_id
};

named_parameter_type_map_t named_params = {
{"security_protocol", LogicalType::VARCHAR},
{"sasl_mechanism", LogicalType::VARCHAR},
{"username", LogicalType::VARCHAR},
{"password", LogicalType::VARCHAR}
};

TableFunction kafka_consumer("kafka_consumer", args, KafkaConsumerFunction,
KafkaConsumerBind, KafkaConsumerInit);

// Set parallel properties
kafka_consumer.named_parameters = named_params;
kafka_consumer.projection_pushdown = false;
kafka_consumer.filter_pushdown = false;

ExtensionUtil::RegisterFunction(*db.instance, kafka_consumer);
}

std::string Name() override {
return "kafquack"; // Changed from "kafka" to "kafquack"
return "kafquack";
}

// Add Version method implementation
std::string Version() const override {
#ifdef KAFQUACK_VERSION
return KAFQUACK_VERSION;
Expand All @@ -195,12 +245,12 @@ class KafquackExtension : public Extension { // Changed from KafkaExtension to
} // namespace duckdb

extern "C" {
DUCKDB_EXTENSION_API void kafquack_init(duckdb::DatabaseInstance &db) { // Changed from kafka_init
DUCKDB_EXTENSION_API void kafquack_init(duckdb::DatabaseInstance &db) {
duckdb::DuckDB db_wrapper(db);
db_wrapper.LoadExtension<duckdb::KafquackExtension>();
}

DUCKDB_EXTENSION_API const char *kafquack_version() { // Changed from kafka_version
DUCKDB_EXTENSION_API const char *kafquack_version() {
return duckdb::DuckDB::LibraryVersion();
}
}
Loading