Skip to content

Commit af67646

Browse files
committed
named params
1 parent 042e69f commit af67646

File tree

2 files changed

+118
-128
lines changed

2 files changed

+118
-128
lines changed

README.md

Lines changed: 17 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,26 @@
1-
# Kafquack
1+
# DuckDB Kafka Extension
22

3-
This repository is based on https://github.com/duckdb/extension-template, check it out if you want to build and ship your own DuckDB extension.
3+
This extension allows you to consume Kafka messages through a DuckDB Table function that connects to a Kafka broker and streams messages as a table.
44

5-
---
5+
> The extension is a WIP and not functional! Join if you're willing to contribute!
66
7-
This extension, Kafquack, allow you to ... <extension_goal>.
7+
## Examples
88

9-
10-
## Building
11-
### Managing dependencies
12-
DuckDB extensions uses VCPKG for dependency management. Enabling VCPKG is very simple: follow the [installation instructions](https://vcpkg.io/en/getting-started) or just run the following:
13-
```shell
14-
git clone https://github.com/Microsoft/vcpkg.git
15-
./vcpkg/bootstrap-vcpkg.sh
16-
export VCPKG_TOOLCHAIN_PATH=`pwd`/vcpkg/scripts/buildsystems/vcpkg.cmake
17-
```
18-
Note: VCPKG is only required for extensions that want to rely on it for dependency management. If you want to develop an extension without dependencies, or want to do your own dependency management, just skip this step. Note that the example extension uses VCPKG to build with a dependency for instructive purposes, so when skipping this step the build may not work without removing the dependency.
19-
20-
### Build steps
21-
Now to build the extension, run:
22-
```sh
23-
make
24-
```
25-
The main binaries that will be built are:
26-
```sh
27-
./build/release/duckdb
28-
./build/release/test/unittest
29-
./build/release/extension/kafquack/kafquack.duckdb_extension
30-
```
31-
- `duckdb` is the binary for the duckdb shell with the extension code automatically loaded.
32-
- `unittest` is the test runner of duckdb. Again, the extension is already linked into the binary.
33-
- `kafquack.duckdb_extension` is the loadable binary as it would be distributed.
34-
35-
## Running the extension
36-
To run the extension code, simply start the shell with `./build/release/duckdb`.
37-
38-
Now we can use the features from the extension directly in DuckDB. The template contains a single scalar function `kafquack()` that takes a string arguments and returns a string:
39-
```
40-
D select kafquack('Jane') as result;
41-
┌───────────────┐
42-
│ result │
43-
│ varchar │
44-
├───────────────┤
45-
│ Kafquack Jane 🐥 │
46-
└───────────────┘
47-
```
48-
49-
## Running the tests
50-
Different tests can be created for DuckDB extensions. The primary way of testing DuckDB extensions should be the SQL tests in `./test/sql`. These SQL tests can be run using:
51-
```sh
52-
make test
53-
```
54-
55-
### Installing the deployed binaries
56-
To install your extension binaries from S3, you will need to do two things. Firstly, DuckDB should be launched with the
57-
`allow_unsigned_extensions` option set to true. How to set this will depend on the client you're using. Some examples:
58-
59-
CLI:
60-
```shell
61-
duckdb -unsigned
62-
```
63-
64-
Python:
65-
```python
66-
con = duckdb.connect(':memory:', config={'allow_unsigned_extensions' : 'true'})
67-
```
68-
69-
NodeJS:
70-
```js
71-
db = new duckdb.Database(':memory:', {"allow_unsigned_extensions": "true"});
72-
```
73-
74-
Secondly, you will need to set the repository endpoint in DuckDB to the HTTP url of your bucket + version of the extension
75-
you want to install. To do this run the following SQL query in DuckDB:
9+
#### Basic usage:
7610
```sql
77-
SET custom_extension_repository='bucket.s3.eu-west-1.amazonaws.com/<your_extension_name>/latest';
11+
SELECT * FROM kafka_consumer('localhost:9092', 'test-topic', 'test-group');
7812
```
79-
Note that the `/latest` path will allow you to install the latest extension version available for your current version of
80-
DuckDB. To specify a specific version, you can pass the version instead.
8113

82-
After running these steps, you can install and load your extension using the regular INSTALL/LOAD commands in DuckDB:
14+
#### Secure usage:
15+
8316
```sql
84-
INSTALL kafquack
85-
LOAD kafquack
17+
SELECT * FROM kafka_consumer(
18+
'broker.some.cloud:9092',
19+
'test-topic',
20+
'test-group',
21+
security_protocol := 'SASL_SSL',
22+
sasl_mechanism := 'PLAIN',
23+
username := 'your-key',
24+
password := 'your-secret'
25+
);
8626
```

src/kafquack_extension.cpp

Lines changed: 101 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,38 @@ struct KafkaConsumerData : public TableFunctionData {
1515
string brokers;
1616
string topic;
1717
string group_id;
18+
string security_protocol;
19+
string sasl_mechanism;
20+
string username;
21+
string password;
1822
unique_ptr<cppkafka::Consumer> consumer;
1923
bool running = true;
2024

21-
// Buffer for messages
2225
std::mutex buffer_mutex;
2326
std::vector<cppkafka::Message> message_buffer;
2427
static constexpr size_t BUFFER_SIZE = 1000;
2528

26-
KafkaConsumerData(string brokers, string topic, string group_id)
27-
: brokers(brokers), topic(topic), group_id(group_id) {
28-
29-
// Configure Kafka consumer
30-
cppkafka::Configuration config = {
31-
{ "metadata.broker.list", brokers },
32-
{ "group.id", group_id },
33-
{ "enable.auto.commit", false }
34-
};
29+
unique_ptr<FunctionData> Copy() const override {
30+
auto result = make_uniq<KafkaConsumerData>();
31+
result->brokers = brokers;
32+
result->topic = topic;
33+
result->group_id = group_id;
34+
result->security_protocol = security_protocol;
35+
result->sasl_mechanism = sasl_mechanism;
36+
result->username = username;
37+
result->password = password;
38+
return result;
39+
}
3540

36-
// Create consumer
37-
consumer = make_uniq<cppkafka::Consumer>(config);
38-
consumer->subscribe({topic});
41+
bool Equals(const FunctionData &other_p) const override {
42+
auto &other = other_p.Cast<KafkaConsumerData>();
43+
return brokers == other.brokers &&
44+
topic == other.topic &&
45+
group_id == other.group_id &&
46+
security_protocol == other.security_protocol &&
47+
sasl_mechanism == other.sasl_mechanism &&
48+
username == other.username &&
49+
password == other.password;
3950
}
4051

4152
~KafkaConsumerData() {
@@ -55,15 +66,38 @@ struct KafkaConsumerGlobalState : public GlobalTableFunctionState {
5566
};
5667

5768
static unique_ptr<FunctionData> KafkaConsumerBind(ClientContext &context,
58-
TableFunctionBindInput &input,
59-
vector<LogicalType> &return_types,
60-
vector<string> &names) {
61-
62-
auto brokers = input.inputs[0].GetValue<string>();
63-
auto topic = input.inputs[1].GetValue<string>();
64-
auto group_id = input.inputs[2].GetValue<string>();
69+
TableFunctionBindInput &input,
70+
vector<LogicalType> &return_types,
71+
vector<string> &names) {
72+
auto result = make_uniq<KafkaConsumerData>();
73+
result->brokers = input.inputs[0].GetValue<string>();
74+
result->topic = input.inputs[1].GetValue<string>();
75+
result->group_id = input.inputs[2].GetValue<string>();
76+
77+
for (auto &kv : input.named_parameters) {
78+
if (kv.first == "security_protocol") {
79+
result->security_protocol = StringValue::Get(kv.second);
80+
if (result->security_protocol != "SASL_SSL" && result->security_protocol != "SASL_PLAINTEXT") {
81+
throw InvalidInputException("security_protocol must be either SASL_SSL or SASL_PLAINTEXT");
82+
}
83+
} else if (kv.first == "sasl_mechanism") {
84+
result->sasl_mechanism = StringValue::Get(kv.second);
85+
if (result->sasl_mechanism != "SCRAM-SHA-256" && result->sasl_mechanism != "PLAIN") {
86+
throw InvalidInputException("sasl_mechanism must be either SCRAM-SHA-256 or PLAIN");
87+
}
88+
} else if (kv.first == "username") {
89+
result->username = StringValue::Get(kv.second);
90+
} else if (kv.first == "password") {
91+
result->password = StringValue::Get(kv.second);
92+
} else {
93+
throw InvalidInputException("Unknown named parameter: %s", kv.first);
94+
}
95+
}
96+
97+
if (!result->security_protocol.empty() && (result->username.empty() || result->password.empty())) {
98+
throw InvalidInputException("username and password are required when security_protocol is set");
99+
}
65100

66-
// Define the schema for our Kafka messages
67101
names = {"topic", "partition", "offset", "timestamp", "key", "value", "error"};
68102
return_types = {
69103
LogicalType::VARCHAR, // topic
@@ -75,7 +109,26 @@ static unique_ptr<FunctionData> KafkaConsumerBind(ClientContext &context,
75109
LogicalType::VARCHAR // error
76110
};
77111

78-
return make_uniq<KafkaConsumerData>(brokers, topic, group_id);
112+
try {
113+
cppkafka::Configuration config;
114+
config.set("metadata.broker.list", result->brokers);
115+
config.set("group.id", result->group_id);
116+
config.set("enable.auto.commit", false);
117+
118+
if (!result->security_protocol.empty()) {
119+
config.set("security.protocol", result->security_protocol);
120+
config.set("sasl.mechanisms", result->sasl_mechanism.empty() ? "PLAIN" : result->sasl_mechanism);
121+
config.set("sasl.username", result->username);
122+
config.set("sasl.password", result->password);
123+
}
124+
125+
result->consumer = make_uniq<cppkafka::Consumer>(config);
126+
result->consumer->subscribe({result->topic});
127+
} catch (const cppkafka::Exception &e) {
128+
throw InvalidInputException("Failed to create Kafka consumer: %s", e.what());
129+
}
130+
131+
return result;
79132
}
80133

81134
static unique_ptr<GlobalTableFunctionState> KafkaConsumerInit(ClientContext &context,
@@ -87,53 +140,44 @@ static void KafkaConsumerFunction(ClientContext &context, TableFunctionInput &da
87140
DataChunk &output) {
88141
auto &data = data_p.bind_data->Cast<KafkaConsumerData>();
89142

90-
// Vector to store the messages for this chunk
91143
vector<cppkafka::Message> messages;
92144

93-
// Try to get messages up to the vector size
94145
idx_t chunk_size = 0;
95146
while (chunk_size < STANDARD_VECTOR_SIZE && data.running) {
96-
auto msg = data.consumer->poll(std::chrono::milliseconds(100));
97-
if (!msg) {
98-
continue;
147+
try {
148+
auto msg = data.consumer->poll(std::chrono::milliseconds(100));
149+
if (!msg) {
150+
continue;
151+
}
152+
messages.push_back(std::move(msg));
153+
chunk_size++;
154+
} catch (const cppkafka::Exception &e) {
155+
throw InvalidInputException("Error polling Kafka: %s", e.what());
99156
}
100-
101-
messages.push_back(std::move(msg));
102-
chunk_size++;
103157
}
104158

105159
if (chunk_size == 0) {
106-
// No messages available
107160
output.SetCardinality(0);
108161
return;
109162
}
110163

111-
// Set up the output vectors
112164
output.SetCardinality(chunk_size);
113165

114166
for (idx_t i = 0; i < chunk_size; i++) {
115167
const auto &msg = messages[i];
116168

117-
// topic
118169
output.data[0].SetValue(i, Value(msg.get_topic()));
119-
120-
// partition
121170
output.data[1].SetValue(i, Value::INTEGER(msg.get_partition()));
122-
123-
// offset
124171
output.data[2].SetValue(i, Value::BIGINT(msg.get_offset()));
125172

126-
// timestamp
127173
auto ts = msg.get_timestamp();
128174
if (ts) {
129-
// Get milliseconds since epoch and convert to timestamp
130175
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(ts.get().get_timestamp());
131176
output.data[3].SetValue(i, Value::TIMESTAMP(Timestamp::FromEpochMs(ms.count())));
132177
} else {
133178
FlatVector::SetNull(output.data[3], i, true);
134179
}
135180

136-
// key
137181
if (msg.get_key()) {
138182
string key_str(reinterpret_cast<const char*>(msg.get_key().get_data()),
139183
msg.get_key().get_size());
@@ -142,7 +186,6 @@ static void KafkaConsumerFunction(ClientContext &context, TableFunctionInput &da
142186
FlatVector::SetNull(output.data[4], i, true);
143187
}
144188

145-
// value and error
146189
if (!msg.get_error()) {
147190
string value_str(reinterpret_cast<const char*>(msg.get_payload().get_data()),
148191
msg.get_payload().get_size());
@@ -153,36 +196,43 @@ static void KafkaConsumerFunction(ClientContext &context, TableFunctionInput &da
153196
output.data[6].SetValue(i, Value(msg.get_error().to_string()));
154197
}
155198

156-
// Commit the message
157-
data.consumer->commit(msg);
199+
try {
200+
data.consumer->commit(msg);
201+
} catch (const cppkafka::Exception &e) {
202+
throw InvalidInputException("Error committing message: %s", e.what());
203+
}
158204
}
159205
}
160206

161-
class KafquackExtension : public Extension { // Changed from KafkaExtension to KafquackExtension
207+
class KafquackExtension : public Extension {
162208
public:
163209
void Load(DuckDB &db) override {
164-
// Register the Kafka consumer function
165210
vector<LogicalType> args = {
166211
LogicalType::VARCHAR, // brokers
167212
LogicalType::VARCHAR, // topic
168213
LogicalType::VARCHAR // group_id
169214
};
170215

216+
named_parameter_type_map_t named_params = {
217+
{"security_protocol", LogicalType::VARCHAR},
218+
{"sasl_mechanism", LogicalType::VARCHAR},
219+
{"username", LogicalType::VARCHAR},
220+
{"password", LogicalType::VARCHAR}
221+
};
222+
171223
TableFunction kafka_consumer("kafka_consumer", args, KafkaConsumerFunction,
172224
KafkaConsumerBind, KafkaConsumerInit);
173-
174-
// Set parallel properties
225+
kafka_consumer.named_parameters = named_params;
175226
kafka_consumer.projection_pushdown = false;
176227
kafka_consumer.filter_pushdown = false;
177228

178229
ExtensionUtil::RegisterFunction(*db.instance, kafka_consumer);
179230
}
180231

181232
std::string Name() override {
182-
return "kafquack"; // Changed from "kafka" to "kafquack"
233+
return "kafquack";
183234
}
184235

185-
// Add Version method implementation
186236
std::string Version() const override {
187237
#ifdef KAFQUACK_VERSION
188238
return KAFQUACK_VERSION;
@@ -195,12 +245,12 @@ class KafquackExtension : public Extension { // Changed from KafkaExtension to
195245
} // namespace duckdb
196246

197247
extern "C" {
198-
DUCKDB_EXTENSION_API void kafquack_init(duckdb::DatabaseInstance &db) { // Changed from kafka_init
248+
DUCKDB_EXTENSION_API void kafquack_init(duckdb::DatabaseInstance &db) {
199249
duckdb::DuckDB db_wrapper(db);
200250
db_wrapper.LoadExtension<duckdb::KafquackExtension>();
201251
}
202252

203-
DUCKDB_EXTENSION_API const char *kafquack_version() { // Changed from kafka_version
253+
DUCKDB_EXTENSION_API const char *kafquack_version() {
204254
return duckdb::DuckDB::LibraryVersion();
205255
}
206256
}

0 commit comments

Comments
 (0)