Skip to content

Commit 290ceeb

Browse files
committed
bucket notification - move kafka options into object. Also add missing call to destroy() (#8889)
Signed-off-by: Amit Prinz Setter <alphaprinz@gmail.com>
1 parent ec065be commit 290ceeb

File tree

4 files changed

+46
-14
lines changed

4 files changed

+46
-14
lines changed

docs/NooBaaNonContainerized/NooBaaCLI.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,16 @@ noobaa-cli connection add --from_file
569569
- Type: Object
570570
- Description: An object given as options to node http(s) request. If "auth" field is specified, it's value is encrypted.
571571

572+
- `kafka_options_object`
573+
- Type: Object
574+
- Description: An object given as options to kafka client.
575+
Options are listed in https://github.com/edenhill/librdkafka/blob/v2.8.0/CONFIGURATION.md.
576+
Specifically, 'metadata.broker.list' is used to specify the external kafka server.
577+
578+
- `topic`
579+
- Type: String
580+
- Description - Topic for kafka messages.
581+
572582
- `from_file`
573583
- Type: String
574584
- Description: Path to a JSON file which includes connection properties. When using `from_file` flag the connection details must only appear inside the options JSON file. See example below.

src/manage_nsfs/manage_nsfs_constants.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ const VALID_OPTIONS_UPGRADE = {
8989
const VALID_OPTIONS_NOTIFICATION = {};
9090

9191
const VALID_OPTIONS_CONNECTION = {
92-
'add': new Set(['name', 'notification_protocol', 'agent_request_object', 'request_options_object', FROM_FILE, ...CLI_MUTUAL_OPTIONS]),
92+
'add': new Set(['name', 'notification_protocol', 'agent_request_object', 'request_options_object', 'topic', 'kafka_options_object', FROM_FILE, ...CLI_MUTUAL_OPTIONS]),
9393
'update': new Set(['name', 'key', 'value', 'remove_key', ...CLI_MUTUAL_OPTIONS]),
9494
'delete': new Set(['name', ...CLI_MUTUAL_OPTIONS]),
9595
'list': new Set(CLI_MUTUAL_OPTIONS),
@@ -163,6 +163,8 @@ const OPTION_TYPE = {
163163
notification_protocol: 'string',
164164
agent_request_object: 'string',
165165
request_options_object: 'string',
166+
kafka_options_object: 'string',
167+
topic: 'string',
166168
decrypt: 'boolean',
167169
key: 'string',
168170
value: 'string',

src/test/unit_tests/jest_tests/test_nc_nsfs_connection_cli.test.js

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,28 @@ describe('manage nsfs cli connection flow', () => {
6464
conn_options.name = "fromcli";
6565
const res = await exec_manage_cli(TYPES.CONNECTION, ACTIONS.ADD, conn_options);
6666
const res_json = JSON.parse(res.trim());
67-
expect(_.isEqual(new Set(Object.keys(res_json.response)), new Set(['reply', 'code']))).toBe(true);
67+
expect(_.isEqual(new Set(Object.keys(res_json.response)), new Set(['reply', 'code', 'message']))).toBe(true);
6868
const connection = await config_fs.get_connection_by_name(conn_options.name);
6969
assert_connection(connection, conn_options, true);
7070
}, timeout);
7171

72+
it('cli create connection from cli - kafka', async () => {
73+
const conn_options = {
74+
config_root,
75+
notification_protocol: 'kafka',
76+
topic: 'mytopic',
77+
kafka_options_object: {
78+
'metadata.broker.list': 'localhost:9092'
79+
}
80+
};
81+
conn_options.name = "fromcli";
82+
const res = await exec_manage_cli(TYPES.CONNECTION, ACTIONS.ADD, conn_options);
83+
const res_json = JSON.parse(res.trim());
84+
expect(_.isEqual(new Set(Object.keys(res_json.response)), new Set(['reply', 'code', 'message']))).toBe(true);
85+
const connection = await config_fs.get_connection_by_name(conn_options.name);
86+
assert_connection(connection, conn_options, false, true);
87+
}, timeout);
88+
7289
it('cli delete connection ', async () => {
7390
await exec_manage_cli(TYPES.CONNECTION, ACTIONS.DELETE, {config_root, name: defaults.name});
7491
expect(fs.readdirSync(config_fs.connections_dir_path).filter(file => file.endsWith(".json")).length).toEqual(0);
@@ -131,15 +148,21 @@ describe('manage nsfs cli connection flow', () => {
131148
* @param {object} connection actual
132149
* @param {object} connection_options expected
133150
* @param {boolean} is_encrypted whether connection's auth field is encrypted
151+
* @param {boolean} is_kafka check for kafka fields (defualt is for http fields)
134152
*/
135-
function assert_connection(connection, connection_options, is_encrypted) {
153+
function assert_connection(connection, connection_options, is_encrypted, is_kafka) {
136154
expect(connection.name).toEqual(connection_options.name);
137155
expect(connection.notification_protocol).toEqual(connection_options.notification_protocol);
138-
expect(connection.agent_request_object).toStrictEqual(connection_options.agent_request_object);
139-
if (is_encrypted) {
140-
expect(connection.request_options_object).not.toStrictEqual(connection_options.request_options_object);
156+
if (is_kafka) {
157+
expect(connection.topic).toStrictEqual(connection_options.topic);
158+
expect(connection.kafka_options_object).toStrictEqual(connection_options.kafka_options_object);
141159
} else {
142-
expect(connection.request_options_object).toStrictEqual(connection_options.request_options_object);
160+
expect(connection.agent_request_object).toStrictEqual(connection_options.agent_request_object);
161+
if (is_encrypted) {
162+
expect(connection.request_options_object).not.toStrictEqual(connection_options.request_options_object);
163+
} else {
164+
expect(connection.request_options_object).toStrictEqual(connection_options.request_options_object);
165+
}
143166
}
144167
}
145168

src/util/notifications_util.js

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ class Notificator {
104104
} finally {
105105
await log.close();
106106
this.notif_to_connect.clear();
107+
for (const conn of this.connect_str_to_connection.values()) {
108+
conn.destroy();
109+
}
107110
}
108111
}
109112
}
@@ -255,13 +258,7 @@ class KafkaNotificator {
255258
}
256259

257260
async connect() {
258-
//kafka client doens't like options it's not familiar with
259-
//so delete them before connecting
260-
const connect_for_kafka = structuredClone(this.connect_obj);
261-
delete connect_for_kafka.topic;
262-
delete connect_for_kafka.notification_protocol;
263-
delete connect_for_kafka.name;
264-
this.connection = new Kafka.HighLevelProducer(connect_for_kafka);
261+
this.connection = new Kafka.HighLevelProducer(this.connect_obj.kafka_options_object);
265262
await new Promise((res, rej) => {
266263
this.connection.on('ready', () => {
267264
res();

0 commit comments

Comments
 (0)