Skip to content

Commit 0b70908

Browse files
committed
OAUTHBEARER support for high level consumer
1 parent 2df0c74 commit 0b70908

File tree

4 files changed

+368
-0
lines changed

4 files changed

+368
-0
lines changed

kafka_consumer.c

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -868,6 +868,145 @@ PHP_METHOD(RdKafka_KafkaConsumer, resumePartitions)
868868
}
869869
/* }}} */
870870

871+
#ifdef HAS_RD_KAFKA_OAUTHBEARER
872+
/* {{{ proto void RdKafka::oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = [])
873+
* Set SASL/OAUTHBEARER token and metadata
874+
*
875+
* The SASL/OAUTHBEARER token refresh callback or event handler should cause
876+
* this method to be invoked upon success, via
877+
* $kafka->oauthbearerSetToken(). The extension keys must not include the
878+
* reserved key "`auth`", and all extension keys and values must conform to the
879+
* required format as per https://tools.ietf.org/html/rfc7628#section-3.1:
880+
*
881+
* key = 1*(ALPHA)
882+
* value = *(VCHAR / SP / HTAB / CR / LF )
883+
*/
884+
PHP_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken)
885+
{
886+
object_intern *intern;
887+
char *token_value;
888+
size_t token_value_len;
889+
zend_long lifetime_ms;
890+
char *principal_name;
891+
size_t principal_len;
892+
HashTable *extensions_hash = NULL;
893+
894+
char errstr[512];
895+
rd_kafka_resp_err_t ret = 0;
896+
897+
if (zend_parse_parameters(ZEND_NUM_ARGS(), "sls|h", &token_value, &token_value_len, &lifetime_ms, &principal_name, &principal_len, &extensions_hash) == FAILURE) {
898+
return;
899+
}
900+
901+
intern = get_object(getThis());
902+
if (!intern) {
903+
return;
904+
}
905+
906+
errstr[0] = '\0';
907+
908+
int extensions_size = 0;
909+
char **extensions = NULL;
910+
911+
if (extensions_hash != NULL) {
912+
extensions_size = zend_hash_num_elements(extensions_hash) * 2;
913+
extensions = safe_emalloc((extensions_size * 2), sizeof(char *), 0);
914+
915+
int pos = 0;
916+
zend_ulong num_key;
917+
zend_string *extension_key_str;
918+
zval *extension_zval;
919+
ZEND_HASH_FOREACH_KEY_VAL(extensions_hash, num_key, extension_key_str, extension_zval) {
920+
if (!extension_key_str) {
921+
extension_key_str = zend_long_to_str(num_key);
922+
extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str));
923+
zend_string_release(extension_key_str);
924+
} else {
925+
extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str));
926+
}
927+
928+
zend_string *tmp_extension_val_str;
929+
zend_string *extension_val_str = zval_get_tmp_string(extension_zval, &tmp_extension_val_str);
930+
extensions[pos++] = estrdup(ZSTR_VAL(extension_val_str));
931+
if (tmp_extension_val_str) {
932+
zend_string_release(tmp_extension_val_str);
933+
}
934+
} ZEND_HASH_FOREACH_END();
935+
}
936+
937+
ret = rd_kafka_oauthbearer_set_token(
938+
intern->rk,
939+
token_value,
940+
lifetime_ms,
941+
principal_name,
942+
(const char **)extensions,
943+
extensions_size,
944+
errstr,
945+
sizeof(errstr));
946+
947+
if (extensions != NULL) {
948+
for (int i = 0; i < extensions_size; i++) {
949+
efree(extensions[i]);
950+
}
951+
efree(extensions);
952+
}
953+
954+
switch (ret) {
955+
case RD_KAFKA_RESP_ERR__INVALID_ARG:
956+
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__INVALID_ARG);
957+
return;
958+
case RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED:
959+
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED);
960+
return;
961+
case RD_KAFKA_RESP_ERR__STATE:
962+
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__STATE);
963+
return;
964+
case RD_KAFKA_RESP_ERR_NO_ERROR:
965+
break;
966+
default:
967+
return;
968+
}
969+
}
970+
/* }}} */
971+
972+
/* {{{ proto void RdKafka::oauthbearerSetTokenFailure(string $error)
973+
The SASL/OAUTHBEARER token refresh callback or event handler should cause
974+
this method to be invoked upon failure, via
975+
rd_kafka_oauthbearer_set_token_failure().
976+
*/
977+
PHP_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure)
978+
{
979+
object_intern *intern;
980+
const char *errstr;
981+
size_t errstr_len;
982+
983+
if (zend_parse_parameters(ZEND_NUM_ARGS(), "s", &errstr, &errstr_len) == FAILURE) {
984+
return;
985+
}
986+
987+
intern = get_object(getThis());
988+
if (!intern) {
989+
return;
990+
}
991+
992+
rd_kafka_resp_err_t ret = rd_kafka_oauthbearer_set_token_failure(intern->rk, errstr);
993+
994+
switch (ret) {
995+
case RD_KAFKA_RESP_ERR__INVALID_ARG:
996+
zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__INVALID_ARG);
997+
return;
998+
case RD_KAFKA_RESP_ERR__STATE:
999+
zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__STATE);
1000+
return;
1001+
case RD_KAFKA_RESP_ERR_NO_ERROR:
1002+
break;
1003+
default:
1004+
return;
1005+
}
1006+
}
1007+
/* }}} */
1008+
#endif
1009+
8711010
void kafka_kafka_consumer_minit(INIT_FUNC_ARGS) /* {{{ */
8721011
{
8731012
ce = register_class_RdKafka_KafkaConsumer();

kafka_consumer.stub.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,12 @@ public function pausePartitions(array $topic_partitions): array {}
8181

8282
/** @tentative-return-type */
8383
public function resumePartitions(array $topic_partitions): array {}
84+
85+
#ifdef HAS_RD_KAFKA_OAUTHBEARER
86+
/** @tentative-return-type */
87+
public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void {}
88+
89+
/** @tentative-return-type */
90+
public function oauthbearerSetTokenFailure(string $error): void {}
91+
#endif
8492
}

kafka_consumer_arginfo.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,27 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_queryWatermarkOffsets
129129
ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0)
130130
ZEND_END_ARG_INFO()
131131

132+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
133+
#if (PHP_VERSION_ID >= 80100)
134+
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 3, IS_VOID, 0)
135+
#else
136+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 0, 3)
137+
#endif
138+
ZEND_ARG_TYPE_INFO(0, token_value, IS_STRING, 0)
139+
ZEND_ARG_TYPE_INFO(0, lifetime_ms, IS_LONG, 0)
140+
ZEND_ARG_TYPE_INFO(0, principal_name, IS_STRING, 0)
141+
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, extensions, IS_ARRAY, 0, "[]")
142+
ZEND_END_ARG_INFO()
143+
144+
#if (PHP_VERSION_ID >= 80100)
145+
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 1, IS_VOID, 0)
146+
#else
147+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 0, 1)
148+
#endif
149+
ZEND_ARG_TYPE_INFO(0, error, IS_STRING, 0)
150+
ZEND_END_ARG_INFO()
151+
#endif
152+
132153
#define arginfo_class_RdKafka_KafkaConsumer_offsetsForTimes arginfo_class_RdKafka_KafkaConsumer_getCommittedOffsets
133154

134155
#define arginfo_class_RdKafka_KafkaConsumer_pausePartitions arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions
@@ -162,6 +183,10 @@ ZEND_METHOD(RdKafka_KafkaConsumer, queryWatermarkOffsets);
162183
ZEND_METHOD(RdKafka_KafkaConsumer, offsetsForTimes);
163184
ZEND_METHOD(RdKafka_KafkaConsumer, pausePartitions);
164185
ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions);
186+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
187+
ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken);
188+
ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure);
189+
#endif
165190

166191
static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
167192
ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC)
@@ -191,6 +216,10 @@ static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
191216
ZEND_ME(RdKafka_KafkaConsumer, offsetsForTimes, arginfo_class_RdKafka_KafkaConsumer_offsetsForTimes, ZEND_ACC_PUBLIC)
192217
ZEND_ME(RdKafka_KafkaConsumer, pausePartitions, arginfo_class_RdKafka_KafkaConsumer_pausePartitions, ZEND_ACC_PUBLIC)
193218
ZEND_ME(RdKafka_KafkaConsumer, resumePartitions, arginfo_class_RdKafka_KafkaConsumer_resumePartitions, ZEND_ACC_PUBLIC)
219+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
220+
ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetToken, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, ZEND_ACC_PUBLIC)
221+
ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC)
222+
#endif
194223
ZEND_FE_END
195224
};
196225

0 commit comments

Comments
 (0)