Skip to content

Commit 285314d

Browse files
authored
SASL_SSL OAUTHBEARER support for high level consumer (#581)
1 parent 4e0d407 commit 285314d

10 files changed

+509
-106
lines changed

config.m4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ if test "$PHP_RDKAFKA" != "no"; then
9696

9797
AC_CHECK_LIB($LIBNAME,[rd_kafka_oauthbearer_set_token],[
9898
AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER,1,[ ])
99+
SOURCES="$SOURCES oauthbearer.c"
99100
],[
100101
AC_MSG_WARN([oauthbearer support is not available])
101102
])

kafka_consumer.c

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
#include "topic.h"
3232
#include "message.h"
3333
#include "metadata.h"
34+
#ifdef HAS_RD_KAFKA_OAUTHBEARER
35+
#include "oauthbearer.h"
36+
#endif
3437
#if PHP_VERSION_ID < 80000
3538
#include "kafka_consumer_legacy_arginfo.h"
3639
#else
@@ -869,6 +872,90 @@ PHP_METHOD(RdKafka_KafkaConsumer, resumePartitions)
869872
}
870873
/* }}} */
871874

875+
/* {{{ proto int RdKafka::poll(int $timeout_ms)
876+
Polls the provided kafka handle for events */
877+
PHP_METHOD(RdKafka_KafkaConsumer, poll)
878+
{
879+
object_intern *intern;
880+
zend_long timeout;
881+
882+
if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &timeout) == FAILURE) {
883+
return;
884+
}
885+
886+
intern = get_object(getThis());
887+
if (!intern) {
888+
return;
889+
}
890+
891+
RETURN_LONG(rd_kafka_poll(intern->rk, timeout));
892+
}
893+
/* }}} */
894+
895+
#ifdef HAS_RD_KAFKA_OAUTHBEARER
896+
/* {{{ proto void RdKafka\KafkaConsumer::oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = [])
897+
* Set SASL/OAUTHBEARER token and metadata
898+
*
899+
* The SASL/OAUTHBEARER token refresh callback or event handler should cause
900+
* this method to be invoked upon success, via
901+
* $consumer->oauthbearerSetToken(). The extension keys must not include the
902+
* reserved key "`auth`", and all extension keys and values must conform to the
903+
* required format as per https://tools.ietf.org/html/rfc7628#section-3.1:
904+
*
905+
* key = 1*(ALPHA)
906+
* value = *(VCHAR / SP / HTAB / CR / LF )
907+
*/
908+
PHP_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken)
909+
{
910+
object_intern *intern;
911+
char *token_value;
912+
size_t token_value_len;
913+
zval *zlifetime_ms;
914+
int64_t lifetime_ms;
915+
char *principal_name;
916+
size_t principal_len;
917+
HashTable *extensions_hash = NULL;
918+
919+
if (zend_parse_parameters(ZEND_NUM_ARGS(), "szs|h", &token_value, &token_value_len, &zlifetime_ms, &principal_name, &principal_len, &extensions_hash) == FAILURE) {
920+
return;
921+
}
922+
923+
lifetime_ms = zval_to_int64(zlifetime_ms, "Argument #2 ($lifetime_ms) must be a valid integer");
924+
925+
intern = get_object(getThis());
926+
if (!intern) {
927+
return;
928+
}
929+
930+
oauthbearer_set_token(intern->rk, token_value, lifetime_ms, principal_name, extensions_hash);
931+
}
932+
/* }}} */
933+
934+
/* {{{ proto void RdKafka::oauthbearerSetTokenFailure(string $error)
935+
The SASL/OAUTHBEARER token refresh callback or event handler should cause
936+
this method to be invoked upon failure, via
937+
rd_kafka_oauthbearer_set_token_failure().
938+
*/
939+
PHP_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure)
940+
{
941+
object_intern *intern;
942+
const char *errstr;
943+
size_t errstr_len;
944+
945+
if (zend_parse_parameters(ZEND_NUM_ARGS(), "s", &errstr, &errstr_len) == FAILURE) {
946+
return;
947+
}
948+
949+
intern = get_object(getThis());
950+
if (!intern) {
951+
return;
952+
}
953+
954+
oauthbearer_set_token_failure(intern->rk, errstr);
955+
}
956+
/* }}} */
957+
#endif
958+
872959
void kafka_kafka_consumer_minit(INIT_FUNC_ARGS) /* {{{ */
873960
{
874961
ce = register_class_RdKafka_KafkaConsumer();

kafka_consumer.stub.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,15 @@ public function pausePartitions(array $topic_partitions): array {}
8181

8282
/** @tentative-return-type */
8383
public function resumePartitions(array $topic_partitions): array {}
84+
85+
/** @tentative-return-type */
86+
public function poll(int $timeout_ms): int {}
87+
88+
#ifdef HAS_RD_KAFKA_OAUTHBEARER
89+
/** @tentative-return-type */
90+
public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void {}
91+
92+
/** @tentative-return-type */
93+
public function oauthbearerSetTokenFailure(string $error): void {}
94+
#endif
8495
}

kafka_consumer_arginfo.h

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/* This is a generated file, edit the .stub.php file instead.
2-
* Stub hash: 19d0e5f9de1e91016dd8e8c87e88c3d17e0c094f */
2+
* Stub hash: c992beb679f8970a0d6af9fb06592d8c3c4cb232 */
33

44
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer___construct, 0, 0, 1)
55
ZEND_ARG_OBJ_INFO(0, conf, RdKafka\\Conf, 0)
@@ -135,6 +135,38 @@ ZEND_END_ARG_INFO()
135135

136136
#define arginfo_class_RdKafka_KafkaConsumer_resumePartitions arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions
137137

138+
#if (PHP_VERSION_ID >= 80100)
139+
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_poll, 0, 1, IS_LONG, 0)
140+
#else
141+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_poll, 0, 0, 1)
142+
#endif
143+
ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0)
144+
ZEND_END_ARG_INFO()
145+
146+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
147+
#if (PHP_VERSION_ID >= 80100)
148+
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 3, IS_VOID, 0)
149+
#else
150+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 0, 3)
151+
#endif
152+
ZEND_ARG_TYPE_INFO(0, token_value, IS_STRING, 0)
153+
ZEND_ARG_TYPE_INFO(0, lifetime_ms, IS_LONG, 0)
154+
ZEND_ARG_TYPE_INFO(0, principal_name, IS_STRING, 0)
155+
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, extensions, IS_ARRAY, 0, "[]")
156+
ZEND_END_ARG_INFO()
157+
#endif
158+
159+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
160+
#if (PHP_VERSION_ID >= 80100)
161+
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 1, IS_VOID, 0)
162+
#else
163+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 0, 1)
164+
#endif
165+
ZEND_ARG_TYPE_INFO(0, error, IS_STRING, 0)
166+
ZEND_END_ARG_INFO()
167+
#endif
168+
169+
138170
ZEND_METHOD(RdKafka_KafkaConsumer, __construct);
139171
ZEND_METHOD(RdKafka_KafkaConsumer, assign);
140172
#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN)
@@ -162,6 +194,14 @@ ZEND_METHOD(RdKafka_KafkaConsumer, queryWatermarkOffsets);
162194
ZEND_METHOD(RdKafka_KafkaConsumer, offsetsForTimes);
163195
ZEND_METHOD(RdKafka_KafkaConsumer, pausePartitions);
164196
ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions);
197+
ZEND_METHOD(RdKafka_KafkaConsumer, poll);
198+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
199+
ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken);
200+
#endif
201+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
202+
ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure);
203+
#endif
204+
165205

166206
static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
167207
ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC)
@@ -191,6 +231,13 @@ static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
191231
ZEND_ME(RdKafka_KafkaConsumer, offsetsForTimes, arginfo_class_RdKafka_KafkaConsumer_offsetsForTimes, ZEND_ACC_PUBLIC)
192232
ZEND_ME(RdKafka_KafkaConsumer, pausePartitions, arginfo_class_RdKafka_KafkaConsumer_pausePartitions, ZEND_ACC_PUBLIC)
193233
ZEND_ME(RdKafka_KafkaConsumer, resumePartitions, arginfo_class_RdKafka_KafkaConsumer_resumePartitions, ZEND_ACC_PUBLIC)
234+
ZEND_ME(RdKafka_KafkaConsumer, poll, arginfo_class_RdKafka_KafkaConsumer_poll, ZEND_ACC_PUBLIC)
235+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
236+
ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetToken, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, ZEND_ACC_PUBLIC)
237+
#endif
238+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
239+
ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC)
240+
#endif
194241
ZEND_FE_END
195242
};
196243

@@ -199,11 +246,7 @@ static zend_class_entry *register_class_RdKafka_KafkaConsumer(void)
199246
zend_class_entry ce, *class_entry;
200247

201248
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "KafkaConsumer", class_RdKafka_KafkaConsumer_methods);
202-
#if (PHP_VERSION_ID >= 80400)
203-
class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
204-
#else
205249
class_entry = zend_register_internal_class_ex(&ce, NULL);
206-
#endif
207250

208251
zval property_error_cb_default_value;
209252
ZVAL_UNDEF(&property_error_cb_default_value);

kafka_consumer_legacy_arginfo.h

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/* This is a generated file, edit the .stub.php file instead.
2-
* Stub hash: 19d0e5f9de1e91016dd8e8c87e88c3d17e0c094f */
2+
* Stub hash: c992beb679f8970a0d6af9fb06592d8c3c4cb232 */
33

44
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer___construct, 0, 0, 1)
55
ZEND_ARG_INFO(0, conf)
@@ -82,6 +82,24 @@ ZEND_END_ARG_INFO()
8282

8383
#define arginfo_class_RdKafka_KafkaConsumer_resumePartitions arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions
8484

85+
#define arginfo_class_RdKafka_KafkaConsumer_poll arginfo_class_RdKafka_KafkaConsumer_consume
86+
87+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
88+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 0, 3)
89+
ZEND_ARG_INFO(0, token_value)
90+
ZEND_ARG_INFO(0, lifetime_ms)
91+
ZEND_ARG_INFO(0, principal_name)
92+
ZEND_ARG_INFO(0, extensions)
93+
ZEND_END_ARG_INFO()
94+
#endif
95+
96+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
97+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 0, 1)
98+
ZEND_ARG_INFO(0, error)
99+
ZEND_END_ARG_INFO()
100+
#endif
101+
102+
85103
ZEND_METHOD(RdKafka_KafkaConsumer, __construct);
86104
ZEND_METHOD(RdKafka_KafkaConsumer, assign);
87105
#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN)
@@ -109,6 +127,14 @@ ZEND_METHOD(RdKafka_KafkaConsumer, queryWatermarkOffsets);
109127
ZEND_METHOD(RdKafka_KafkaConsumer, offsetsForTimes);
110128
ZEND_METHOD(RdKafka_KafkaConsumer, pausePartitions);
111129
ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions);
130+
ZEND_METHOD(RdKafka_KafkaConsumer, poll);
131+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
132+
ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken);
133+
#endif
134+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
135+
ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure);
136+
#endif
137+
112138

113139
static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
114140
ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC)
@@ -138,6 +164,13 @@ static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
138164
ZEND_ME(RdKafka_KafkaConsumer, offsetsForTimes, arginfo_class_RdKafka_KafkaConsumer_offsetsForTimes, ZEND_ACC_PUBLIC)
139165
ZEND_ME(RdKafka_KafkaConsumer, pausePartitions, arginfo_class_RdKafka_KafkaConsumer_pausePartitions, ZEND_ACC_PUBLIC)
140166
ZEND_ME(RdKafka_KafkaConsumer, resumePartitions, arginfo_class_RdKafka_KafkaConsumer_resumePartitions, ZEND_ACC_PUBLIC)
167+
ZEND_ME(RdKafka_KafkaConsumer, poll, arginfo_class_RdKafka_KafkaConsumer_poll, ZEND_ACC_PUBLIC)
168+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
169+
ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetToken, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, ZEND_ACC_PUBLIC)
170+
#endif
171+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
172+
ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC)
173+
#endif
141174
ZEND_FE_END
142175
};
143176

@@ -146,11 +179,7 @@ static zend_class_entry *register_class_RdKafka_KafkaConsumer(void)
146179
zend_class_entry ce, *class_entry;
147180

148181
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "KafkaConsumer", class_RdKafka_KafkaConsumer_methods);
149-
#if (PHP_VERSION_ID >= 80400)
150-
class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
151-
#else
152182
class_entry = zend_register_internal_class_ex(&ce, NULL);
153-
#endif
154183

155184
zval property_error_cb_default_value;
156185
ZVAL_NULL(&property_error_cb_default_value);

0 commit comments

Comments
 (0)