Skip to content

Commit e0f7b83

Browse files
committed
Merge branch '6.x' into 7.x
2 parents 0866790 + 285314d commit e0f7b83

9 files changed

+447
-98
lines changed

config.m4

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ if test "$PHP_RDKAFKA" != "no"; then
2727

2828
PHP_ADD_INCLUDE($RDKAFKA_DIR/include)
2929

30-
SOURCES="rdkafka.c metadata.c metadata_broker.c metadata_topic.c metadata_partition.c metadata_collection.c conf.c topic.c queue.c message.c fun.c kafka_consumer.c topic_partition.c kafka_error_exception.c"
30+
SOURCES="rdkafka.c metadata.c metadata_broker.c metadata_topic.c metadata_partition.c metadata_collection.c conf.c topic.c queue.c message.c fun.c kafka_consumer.c topic_partition.c kafka_error_exception.c oauthbearer.c"
3131

3232
LIBNAME=rdkafka
3333
LIBSYMBOL=rd_kafka_new

kafka_consumer.c

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "topic.h"
3232
#include "message.h"
3333
#include "metadata.h"
34+
#include "oauthbearer.h"
3435
#include "kafka_consumer_arginfo.h"
3536

3637
typedef struct _object_intern {
@@ -863,6 +864,88 @@ PHP_METHOD(RdKafka_KafkaConsumer, resumePartitions)
863864
}
864865
/* }}} */
865866

867+
/* {{{ proto int RdKafka::poll(int $timeout_ms)
868+
Polls the provided kafka handle for events */
869+
PHP_METHOD(RdKafka_KafkaConsumer, poll)
870+
{
871+
object_intern *intern;
872+
zend_long timeout;
873+
874+
if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &timeout) == FAILURE) {
875+
return;
876+
}
877+
878+
intern = get_object(getThis());
879+
if (!intern) {
880+
return;
881+
}
882+
883+
RETURN_LONG(rd_kafka_poll(intern->rk, timeout));
884+
}
885+
/* }}} */
886+
887+
/* {{{ proto void RdKafka\KafkaConsumer::oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = [])
888+
* Set SASL/OAUTHBEARER token and metadata
889+
*
890+
* The SASL/OAUTHBEARER token refresh callback or event handler should cause
891+
* this method to be invoked upon success, via
892+
* $consumer->oauthbearerSetToken(). The extension keys must not include the
893+
* reserved key "`auth`", and all extension keys and values must conform to the
894+
* required format as per https://tools.ietf.org/html/rfc7628#section-3.1:
895+
*
896+
* key = 1*(ALPHA)
897+
* value = *(VCHAR / SP / HTAB / CR / LF )
898+
*/
899+
PHP_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken)
900+
{
901+
object_intern *intern;
902+
char *token_value;
903+
size_t token_value_len;
904+
zval *zlifetime_ms;
905+
int64_t lifetime_ms;
906+
char *principal_name;
907+
size_t principal_len;
908+
HashTable *extensions_hash = NULL;
909+
910+
if (zend_parse_parameters(ZEND_NUM_ARGS(), "szs|h", &token_value, &token_value_len, &zlifetime_ms, &principal_name, &principal_len, &extensions_hash) == FAILURE) {
911+
return;
912+
}
913+
914+
lifetime_ms = zval_to_int64(zlifetime_ms, "Argument #2 ($lifetime_ms) must be a valid integer");
915+
916+
intern = get_object(getThis());
917+
if (!intern) {
918+
return;
919+
}
920+
921+
oauthbearer_set_token(intern->rk, token_value, lifetime_ms, principal_name, extensions_hash);
922+
}
923+
/* }}} */
924+
925+
/* {{{ proto void RdKafka::oauthbearerSetTokenFailure(string $error)
926+
The SASL/OAUTHBEARER token refresh callback or event handler should cause
927+
this method to be invoked upon failure, via
928+
rd_kafka_oauthbearer_set_token_failure().
929+
*/
930+
PHP_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure)
931+
{
932+
object_intern *intern;
933+
const char *errstr;
934+
size_t errstr_len;
935+
936+
if (zend_parse_parameters(ZEND_NUM_ARGS(), "s", &errstr, &errstr_len) == FAILURE) {
937+
return;
938+
}
939+
940+
intern = get_object(getThis());
941+
if (!intern) {
942+
return;
943+
}
944+
945+
oauthbearer_set_token_failure(intern->rk, errstr);
946+
}
947+
/* }}} */
948+
866949
void kafka_kafka_consumer_minit(INIT_FUNC_ARGS) /* {{{ */
867950
{
868951
ce = register_class_RdKafka_KafkaConsumer();

kafka_consumer.stub.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,13 @@ public function pausePartitions(array $topic_partitions): array {}
7979

8080
/** @tentative-return-type */
8181
public function resumePartitions(array $topic_partitions): array {}
82+
83+
/** @tentative-return-type */
84+
public function poll(int $timeout_ms): int {}
85+
86+
/** @tentative-return-type */
87+
public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void {}
88+
89+
/** @tentative-return-type */
90+
public function oauthbearerSetTokenFailure(string $error): void {}
8291
}

kafka_consumer_arginfo.h

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/* This is a generated file, edit the .stub.php file instead.
2-
* Stub hash: 5396249050f6bf118e5f830140cc016efee80def */
2+
* Stub hash: 6ebc69a2e5c4d2a7815e02ebebc84d54b93d01ec */
33

44
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer___construct, 0, 0, 1)
55
ZEND_ARG_OBJ_INFO(0, conf, RdKafka\\Conf, 0)
@@ -133,6 +133,27 @@ ZEND_END_ARG_INFO()
133133

134134
#define arginfo_class_RdKafka_KafkaConsumer_resumePartitions arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions
135135

136+
#define arginfo_class_RdKafka_KafkaConsumer_poll arginfo_class_RdKafka_KafkaConsumer_getControllerId
137+
138+
#if (PHP_VERSION_ID >= 80100)
139+
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 3, IS_VOID, 0)
140+
#else
141+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 0, 3)
142+
#endif
143+
ZEND_ARG_TYPE_INFO(0, token_value, IS_STRING, 0)
144+
ZEND_ARG_TYPE_INFO(0, lifetime_ms, IS_LONG, 0)
145+
ZEND_ARG_TYPE_INFO(0, principal_name, IS_STRING, 0)
146+
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, extensions, IS_ARRAY, 0, "[]")
147+
ZEND_END_ARG_INFO()
148+
149+
#if (PHP_VERSION_ID >= 80100)
150+
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 1, IS_VOID, 0)
151+
#else
152+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 0, 1)
153+
#endif
154+
ZEND_ARG_TYPE_INFO(0, error, IS_STRING, 0)
155+
ZEND_END_ARG_INFO()
156+
136157

137158
ZEND_METHOD(RdKafka_KafkaConsumer, __construct);
138159
ZEND_METHOD(RdKafka_KafkaConsumer, assign);
@@ -159,6 +180,9 @@ ZEND_METHOD(RdKafka_KafkaConsumer, queryWatermarkOffsets);
159180
ZEND_METHOD(RdKafka_KafkaConsumer, offsetsForTimes);
160181
ZEND_METHOD(RdKafka_KafkaConsumer, pausePartitions);
161182
ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions);
183+
ZEND_METHOD(RdKafka_KafkaConsumer, poll);
184+
ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken);
185+
ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure);
162186

163187

164188
static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
@@ -187,6 +211,9 @@ static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
187211
ZEND_ME(RdKafka_KafkaConsumer, offsetsForTimes, arginfo_class_RdKafka_KafkaConsumer_offsetsForTimes, ZEND_ACC_PUBLIC)
188212
ZEND_ME(RdKafka_KafkaConsumer, pausePartitions, arginfo_class_RdKafka_KafkaConsumer_pausePartitions, ZEND_ACC_PUBLIC)
189213
ZEND_ME(RdKafka_KafkaConsumer, resumePartitions, arginfo_class_RdKafka_KafkaConsumer_resumePartitions, ZEND_ACC_PUBLIC)
214+
ZEND_ME(RdKafka_KafkaConsumer, poll, arginfo_class_RdKafka_KafkaConsumer_poll, ZEND_ACC_PUBLIC)
215+
ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetToken, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, ZEND_ACC_PUBLIC)
216+
ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC)
190217
ZEND_FE_END
191218
};
192219

oauthbearer.c

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
+----------------------------------------------------------------------+
3+
| php-rdkafka |
4+
+----------------------------------------------------------------------+
5+
| Copyright (c) 2025 Arnaud Le Blanc |
6+
+----------------------------------------------------------------------+
7+
| This source file is subject to version 3.01 of the PHP license, |
8+
| that is bundled with this package in the file LICENSE, and is |
9+
| available through the world-wide-web at the following url: |
10+
| http://www.php.net/license/3_01.txt |
11+
| If you did not receive a copy of the PHP license and are unable to |
12+
| obtain it through the world-wide-web, please send a note to |
13+
| license@php.net so we can mail you a copy immediately. |
14+
+----------------------------------------------------------------------+
15+
| Author: Martin Fris <rasta@lj.sk> |
16+
+----------------------------------------------------------------------+
17+
*/
18+
19+
#ifdef HAVE_CONFIG_H
20+
#include "config.h"
21+
#endif
22+
23+
#include "php.h"
24+
#include "php_rdkafka.h"
25+
#include "php_rdkafka_priv.h"
26+
#include "Zend/zend_exceptions.h"
27+
#include "ext/spl/spl_exceptions.h"
28+
29+
void oauthbearer_set_token(
30+
rd_kafka_t *rk,
31+
const char *token_value,
32+
int64_t lifetime_ms,
33+
const char *principal_name,
34+
const HashTable *extensions_hash
35+
) {
36+
char errstr[512];
37+
rd_kafka_resp_err_t ret = 0;
38+
39+
errstr[0] = '\0';
40+
41+
int extensions_size = 0;
42+
char **extensions = NULL;
43+
44+
if (extensions_hash != NULL) {
45+
extensions_size = zend_hash_num_elements(extensions_hash) * 2;
46+
extensions = safe_emalloc((extensions_size * 2), sizeof(char *), 0);
47+
48+
int pos = 0;
49+
zend_ulong num_key;
50+
zend_string *extension_key_str;
51+
zval *extension_zval;
52+
ZEND_HASH_FOREACH_KEY_VAL((HashTable*)extensions_hash, num_key, extension_key_str, extension_zval) {
53+
if (!extension_key_str) {
54+
extension_key_str = zend_long_to_str(num_key);
55+
extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str));
56+
zend_string_release(extension_key_str);
57+
} else {
58+
extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str));
59+
}
60+
61+
zend_string *tmp_extension_val_str;
62+
zend_string *extension_val_str = zval_get_tmp_string(extension_zval, &tmp_extension_val_str);
63+
extensions[pos++] = estrdup(ZSTR_VAL(extension_val_str));
64+
if (tmp_extension_val_str) {
65+
zend_string_release(tmp_extension_val_str);
66+
}
67+
} ZEND_HASH_FOREACH_END();
68+
}
69+
70+
ret = rd_kafka_oauthbearer_set_token(
71+
rk,
72+
token_value,
73+
lifetime_ms,
74+
principal_name,
75+
(const char **)extensions,
76+
extensions_size,
77+
errstr,
78+
sizeof(errstr));
79+
80+
if (extensions != NULL) {
81+
for (int i = 0; i < extensions_size; i++) {
82+
efree(extensions[i]);
83+
}
84+
efree(extensions);
85+
}
86+
87+
switch (ret) {
88+
case RD_KAFKA_RESP_ERR__INVALID_ARG:
89+
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__INVALID_ARG);
90+
return;
91+
case RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED:
92+
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED);
93+
return;
94+
case RD_KAFKA_RESP_ERR__STATE:
95+
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__STATE);
96+
return;
97+
case RD_KAFKA_RESP_ERR_NO_ERROR:
98+
break;
99+
default:
100+
return;
101+
}
102+
}
103+
104+
void oauthbearer_set_token_failure(rd_kafka_t *rk, const char *errstr) {
105+
rd_kafka_resp_err_t ret = rd_kafka_oauthbearer_set_token_failure(rk, errstr);
106+
107+
switch (ret) {
108+
case RD_KAFKA_RESP_ERR__INVALID_ARG:
109+
zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__INVALID_ARG);
110+
return;
111+
case RD_KAFKA_RESP_ERR__STATE:
112+
zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__STATE);
113+
return;
114+
case RD_KAFKA_RESP_ERR_NO_ERROR:
115+
break;
116+
default:
117+
return;
118+
}
119+
}
120+
121+
int64_t zval_to_int64(zval *zval, const char *errstr) {
122+
int64_t converted;
123+
124+
switch (Z_TYPE_P(zval)) {
125+
case IS_LONG:
126+
return (int64_t) Z_LVAL_P(zval);
127+
break;
128+
case IS_DOUBLE:
129+
return (int64_t) Z_DVAL_P(zval);
130+
break;
131+
case IS_STRING:;
132+
char *str = Z_STRVAL_P(zval);
133+
char *end;
134+
converted = (int64_t) strtoll(str, &end, 10);
135+
if (end != str + Z_STRLEN_P(zval)) {
136+
zend_throw_exception(spl_ce_InvalidArgumentException, errstr, 0);
137+
return 0;
138+
}
139+
return converted;
140+
break;
141+
EMPTY_SWITCH_DEFAULT_CASE();
142+
}
143+
}

oauthbearer.h

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
+----------------------------------------------------------------------+
3+
| php-rdkafka |
4+
+----------------------------------------------------------------------+
5+
| Copyright (c) 2025 Arnaud Le Blanc |
6+
+----------------------------------------------------------------------+
7+
| This source file is subject to version 3.01 of the PHP license, |
8+
| that is bundled with this package in the file LICENSE, and is |
9+
| available through the world-wide-web at the following url: |
10+
| http://www.php.net/license/3_01.txt |
11+
| If you did not receive a copy of the PHP license and are unable to |
12+
| obtain it through the world-wide-web, please send a note to |
13+
| license@php.net so we can mail you a copy immediately. |
14+
+----------------------------------------------------------------------+
15+
| Author: Martin Fris <rasta@lj.sk> |
16+
+----------------------------------------------------------------------+
17+
*/
18+
19+
#include "librdkafka/rdkafka.h"
20+
21+
void oauthbearer_set_token(rd_kafka_t *rk,
22+
const char *token_value,
23+
int64_t lifetime_ms,
24+
const char *principal_name,
25+
const HashTable *extensions_hash
26+
);
27+
28+
void oauthbearer_set_token_failure(rd_kafka_t *rk, const char *errstr);
29+
30+
int64_t zval_to_int64(zval *zval, const char *errstr);

package.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@
7575
<file role="src" name="metadata_topic_arginfo.h"/>
7676
<file role="src" name="php_rdkafka.h"/>
7777
<file role="src" name="php_rdkafka_priv.h"/>
78+
<file role="src" name="oauthbearer.c"/>
79+
<file role="src" name="oauthbearer.h"/>
7880
<file role="src" name="queue.c"/>
7981
<file role="src" name="queue.h"/>
8082
<file role="src" name="queue.stub.php"/>
@@ -122,6 +124,7 @@
122124
<file role="test" name="metadata_topic_001.phpt"/>
123125
<file role="test" name="new_topic_with_conf.phpt"/>
124126
<file role="test" name="oauthbearer_integration.phpt"/>
127+
<file role="test" name="oauthbearer_integration_hl.phpt"/>
125128
<file role="test" name="pause_resume.phpt"/>
126129
<file role="test" name="produce_consume.phpt"/>
127130
<file role="test" name="produce_consume_queue.phpt"/>

0 commit comments

Comments
 (0)