Skip to content

Commit 7a5a52e

Browse files
authored
Adding Kafka options: "enable.ssl.certificate.verification" & "ssl.key.password" (#29)
* adding kafka opt enable_ssl_certificate_verification ... * adding kafka opts - ssl_key_password & enable_ssl_certificate_verification ...
1 parent 743fd1a commit 7a5a52e

File tree

6 files changed

+87
-6
lines changed

6 files changed

+87
-6
lines changed

doc/CONFIG-KEYS

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ makes the whole line to be ignored by the interpreter, making it a comment.
88

99
KEY: iface
1010
DESC: Defining the network interface receiving the gRPC data-stream. The logical name of the interface can be retrived using shell commads
11-
like 'ip address'. No default value is set and a coherent value is mandatory.
11+
like "ip address". No default value is set and a coherent value is mandatory.
1212
DEFAULT: none
1313

1414
KEY: ipv4_socket_cisco
@@ -134,6 +134,13 @@ DESC: When set to true, the producer will ensure that messages are successf
134134
Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
135135
DEFAULT: "true"
136136

137+
KEY: enable_ssl_certificate_verification
138+
VALUES: ["true" or "false"]
139+
DESC: This is valid only when security_protocol is set to "ssl" and it is enabling/disabling the OpenSSL's builtin broker (server)
140+
certificate verification.
141+
Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
142+
DEFAULT: "true"
143+
137144
KEY: log_level
138145
VALUES: [value >= "0" and value <= "7"]
139146
DESC: Logging level (syslog(3) levels)
@@ -148,23 +155,28 @@ DESC: Defines if the communication between the collector and the Kafka's br
148155
DEFAULT: none
149156

150157
KEY: ssl_ca_location
151-
DESC: This is valid only when security_protocol is set to "ssl" and it's including the file or
158+
DESC: This is valid and mandatory only when security_protocol is set to "ssl" and it's including the file or
152159
directory path to CA certificate(s) for verifying the broker's key
153160
Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
154161
DEFAULT: none
155162

156163
KEY: ssl_certificate_location
157-
DESC: This is valid only when security_protocol is set to "ssl" and it's including the path to
164+
DESC: This is valid and mandatory only when security_protocol is set to "ssl" and it's including the path to
158165
client's public key (PEM) used for authentication
159166
Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
160167
DEFAULT: none
161168

162169
KEY: ssl_key_location
163-
DESC: This is valid only when security_protocol is set to "ssl" and it's including the path to
170+
DESC: This is valid and mandatory only when security_protocol is set to "ssl" and it's including the path to
164171
client's private key (PEM) used for authentication.
165172
Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
166173
DEFAULT: none
167174

175+
KEY: ssl_key_password
176+
DESC: This is valid only when security_protocol is set to "ssl" and it is including the client's private key passphrase
177+
Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
178+
DEFAULT: none
179+
168180
KEY: topic
169181
DESC: Defines the Kafka's topic name where the processed gRPC messages are delivered. This is a mandatory option and
170182
when the selected delivery method is "zmq" it's automatically set to "dummy_topic".

doc/Changelog

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ The keys used are:
77
!: fixed/modified feature, -: deleted feature, +: new feature
88

99

10-
current (main branch) -- 29-05-2024
10+
current (main branch) -- 04-06-2024
1111
+ Adding the ability to disable the checks related to socket binding to a particular device
12+
+ Adding the ability to configure the Kafka option "enable.ssl.certificate.verification". Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
13+
+ Adding the ability to configure the Kafka option "ssl.key.password". Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
1214

1315
v1.1.4 -- 05-12-2023
1416
+ Adding automatic version number retrieval from the VERSION file

src/dataDelivery/kafka_delivery.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@ KafkaDelivery::KafkaDelivery()
2121
kafka_delivery_cfg_parameters.at("security_protocol");
2222
this->ssl_key_location =
2323
kafka_delivery_cfg_parameters.at("ssl_key_location");
24+
this->ssl_key_password =
25+
kafka_delivery_cfg_parameters.at("ssl_key_password");
2426
this->ssl_certificate_location =
2527
kafka_delivery_cfg_parameters.at("ssl_certificate_location");
2628
this->ssl_ca_location =
2729
kafka_delivery_cfg_parameters.at("ssl_ca_location");
30+
this->enable_ssl_certificate_verification =
31+
kafka_delivery_cfg_parameters.at("enable_ssl_certificate_verification");
2832
this->log_level =
2933
kafka_delivery_cfg_parameters.at("log_level");
3034

@@ -38,8 +42,10 @@ void KafkaDelivery::set_kafka_properties(kafka::Properties &properties)
3842
properties.put("client.id", get_client_id());
3943
properties.put("security.protocol", get_security_protocol());
4044
properties.put("ssl.key.location", get_ssl_key_location());
45+
properties.put("ssl.key.password", get_ssl_key_password());
4146
properties.put("ssl.certificate.location", get_ssl_certificate_location());
4247
properties.put("ssl.ca.location", get_ssl_ca_location());
48+
properties.put("enable.ssl.certificate.verification", get_enable_ssl_certificate_verification());
4349
properties.put("log_level", get_log_level());
4450
}
4551

src/dataDelivery/kafka_delivery.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,14 @@ class KafkaDelivery {
4040
return security_protocol; };
4141
std::string get_ssl_key_location() {
4242
return ssl_key_location; };
43+
std::string get_ssl_key_password() {
44+
return ssl_key_password; };
4345
std::string get_ssl_certificate_location() {
4446
return ssl_certificate_location; };
4547
std::string get_ssl_ca_location() {
4648
return ssl_ca_location; };
49+
std::string get_enable_ssl_certificate_verification() {
50+
return enable_ssl_certificate_verification; };
4751
std::string get_log_level() {
4852
return log_level; };
4953
private:
@@ -54,8 +58,10 @@ class KafkaDelivery {
5458
std::string client_id;
5559
std::string security_protocol;
5660
std::string ssl_key_location;
61+
std::string ssl_key_password;
5762
std::string ssl_certificate_location;
5863
std::string ssl_ca_location;
64+
std::string enable_ssl_certificate_verification;
5965
std::string log_level;
6066
};
6167

src/utils/cfg_handler.cc

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -993,9 +993,12 @@ bool KafkaCfgHandler::lookup_kafka_parameters(const std::string &cfg_path,
993993

994994
if (params.at("security_protocol").compare("ssl") == 0) {
995995
bool ssl_key_location = kafka_params.exists("ssl_key_location");
996+
bool ssl_key_password = kafka_params.exists("ssl_key_password");
996997
bool ssl_certificate_location =
997998
kafka_params.exists("ssl_certificate_location");
998999
bool ssl_ca_location = kafka_params.exists("ssl_ca_location");
1000+
bool enable_ssl_certificate_verification =
1001+
kafka_params.exists("enable_ssl_certificate_verification");
9991002

10001003
if (ssl_key_location == true &&
10011004
ssl_certificate_location == true &&
@@ -1032,9 +1035,55 @@ bool KafkaCfgHandler::lookup_kafka_parameters(const std::string &cfg_path,
10321035
} else {
10331036
spdlog::get("multi-logger")->
10341037
error("[security_protocol] configuration issue: "
1035-
"a valid security_protocol is mandatory");
1038+
"one or more mandatory ssl params are missing");
10361039
return false;
10371040
}
1041+
1042+
if (ssl_key_password == true) {
1043+
libconfig::Setting &ssl_key_password =
1044+
kafka_params.lookup("ssl_key_password");
1045+
try {
1046+
std::string ssl_key_password_s = ssl_key_password.c_str();
1047+
if (ssl_key_password_s.empty() == false) {
1048+
params.insert({"ssl_key_password", ssl_key_password_s});
1049+
} else {
1050+
spdlog::get("multi-logger")->
1051+
error("[security_protocol] "
1052+
"configuration issue: is invalid");
1053+
return false;
1054+
}
1055+
} catch (const libconfig::SettingTypeException &ste) {
1056+
spdlog::get("multi-logger")->error("[security_protocol] "
1057+
"configuration issue: {}", ste.what());
1058+
return false;
1059+
}
1060+
} else {
1061+
params.insert({"ssl_key_password", "NULL"});
1062+
}
1063+
1064+
if (enable_ssl_certificate_verification == true) {
1065+
libconfig::Setting &enable_ssl_certificate_verification =
1066+
kafka_params.lookup("enable_ssl_certificate_verification");
1067+
try {
1068+
std::string enable_ssl_certificate_verification_s =
1069+
enable_ssl_certificate_verification.c_str();
1070+
if (enable_ssl_certificate_verification_s.empty() == false) {
1071+
params.insert({"enable_ssl_certificate_verification",
1072+
enable_ssl_certificate_verification_s});
1073+
} else {
1074+
spdlog::get("multi-logger")->
1075+
error("[security_protocol] "
1076+
"configuration issue: is invalid");
1077+
return false;
1078+
}
1079+
} catch (const libconfig::SettingTypeException &ste) {
1080+
spdlog::get("multi-logger")->error("[security_protocol] "
1081+
"configuration issue: {}", ste.what());
1082+
return false;
1083+
}
1084+
} else {
1085+
params.insert({"enable_ssl_certificate_verification", "NULL"});
1086+
}
10381087
} else {
10391088
params.insert({"ssl_key_location", "NULL"});
10401089
params.insert({"ssl_certificate_location", "NULL"});

src/utils/cfg_handler.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,10 +206,14 @@ class KafkaCfgHandler {
206206
// return security_protocol; };
207207
//const std::string &get_kafka_ssl_key_location() const {
208208
// return ssl_key_location; };
209+
//const std::string &get_kafka_ssl_key_password() const {
210+
// return ssl_key_password; };
209211
//const std::string &get_kafka_ssl_certificate_location() const {
210212
// return ssl_certificate_location; };
211213
//const std::string &get_kafka_ssl_ca_location() const {
212214
// return ssl_ca_location; };
215+
//const std::string &get_enable_ssl_certificate_verification() const {
216+
// return enable_ssl_certificate_verification; };
213217
//const std::string &get_kafka_log_level() const {
214218
// return log_level; };
215219
private:
@@ -219,8 +223,10 @@ class KafkaCfgHandler {
219223
const std::string client_id;
220224
const std::string security_protocol;
221225
const std::string ssl_key_location;
226+
const std::string ssl_key_password;
222227
const std::string ssl_certificate_location;
223228
const std::string ssl_ca_location;
229+
const std::string enable_ssl_certificate_verification;
224230
const std::string log_level;
225231
};
226232

0 commit comments

Comments
 (0)