Skip to content

Commit 7335e7a

Browse files
committed
cleanup
1 parent 0b70908 commit 7335e7a

File tree

6 files changed

+222
-256
lines changed

6 files changed

+222
-256
lines changed

config.m4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ if test "$PHP_RDKAFKA" != "no"; then
9696

9797
AC_CHECK_LIB($LIBNAME,[rd_kafka_oauthbearer_set_token],[
9898
AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER,1,[ ])
99+
SOURCES="$SOURCES oauthbearer.c"
99100
],[
100101
AC_MSG_WARN([oauthbearer support is not available])
101102
])

kafka_consumer.c

Lines changed: 12 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
#include "topic.h"
3232
#include "message.h"
3333
#include "metadata.h"
34+
#ifdef HAS_RD_KAFKA_OAUTHBEARER
35+
#include "oauthbearer.h"
36+
#endif
3437
#if PHP_VERSION_ID < 80000
3538
#include "kafka_consumer_legacy_arginfo.h"
3639
#else
@@ -869,12 +872,12 @@ PHP_METHOD(RdKafka_KafkaConsumer, resumePartitions)
869872
/* }}} */
870873

871874
#ifdef HAS_RD_KAFKA_OAUTHBEARER
872-
/* {{{ proto void RdKafka::oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = [])
875+
/* {{{ proto void RdKafka\KafkaConsumer::oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = [])
873876
* Set SASL/OAUTHBEARER token and metadata
874877
*
875878
* The SASL/OAUTHBEARER token refresh callback or event handler should cause
876879
* this method to be invoked upon success, via
877-
* $kafka->oauthbearerSetToken(). The extension keys must not include the
880+
* $consumer->oauthbearerSetToken(). The extension keys must not include the
878881
* reserved key "`auth`", and all extension keys and values must conform to the
879882
* required format as per https://tools.ietf.org/html/rfc7628#section-3.1:
880883
*
@@ -886,86 +889,24 @@ PHP_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken)
886889
object_intern *intern;
887890
char *token_value;
888891
size_t token_value_len;
889-
zend_long lifetime_ms;
892+
zval *zlifetime_ms;
893+
int64_t lifetime_ms;
890894
char *principal_name;
891895
size_t principal_len;
892896
HashTable *extensions_hash = NULL;
893897

894-
char errstr[512];
895-
rd_kafka_resp_err_t ret = 0;
896-
897-
if (zend_parse_parameters(ZEND_NUM_ARGS(), "sls|h", &token_value, &token_value_len, &lifetime_ms, &principal_name, &principal_len, &extensions_hash) == FAILURE) {
898+
if (zend_parse_parameters(ZEND_NUM_ARGS(), "szs|h", &token_value, &token_value_len, &zlifetime_ms, &principal_name, &principal_len, &extensions_hash) == FAILURE) {
898899
return;
899900
}
900901

902+
lifetime_ms = zval_to_int64(zlifetime_ms, "Argument #2 ($lifetime_ms) must be a valid integer");
903+
901904
intern = get_object(getThis());
902905
if (!intern) {
903906
return;
904907
}
905908

906-
errstr[0] = '\0';
907-
908-
int extensions_size = 0;
909-
char **extensions = NULL;
910-
911-
if (extensions_hash != NULL) {
912-
extensions_size = zend_hash_num_elements(extensions_hash) * 2;
913-
extensions = safe_emalloc((extensions_size * 2), sizeof(char *), 0);
914-
915-
int pos = 0;
916-
zend_ulong num_key;
917-
zend_string *extension_key_str;
918-
zval *extension_zval;
919-
ZEND_HASH_FOREACH_KEY_VAL(extensions_hash, num_key, extension_key_str, extension_zval) {
920-
if (!extension_key_str) {
921-
extension_key_str = zend_long_to_str(num_key);
922-
extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str));
923-
zend_string_release(extension_key_str);
924-
} else {
925-
extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str));
926-
}
927-
928-
zend_string *tmp_extension_val_str;
929-
zend_string *extension_val_str = zval_get_tmp_string(extension_zval, &tmp_extension_val_str);
930-
extensions[pos++] = estrdup(ZSTR_VAL(extension_val_str));
931-
if (tmp_extension_val_str) {
932-
zend_string_release(tmp_extension_val_str);
933-
}
934-
} ZEND_HASH_FOREACH_END();
935-
}
936-
937-
ret = rd_kafka_oauthbearer_set_token(
938-
intern->rk,
939-
token_value,
940-
lifetime_ms,
941-
principal_name,
942-
(const char **)extensions,
943-
extensions_size,
944-
errstr,
945-
sizeof(errstr));
946-
947-
if (extensions != NULL) {
948-
for (int i = 0; i < extensions_size; i++) {
949-
efree(extensions[i]);
950-
}
951-
efree(extensions);
952-
}
953-
954-
switch (ret) {
955-
case RD_KAFKA_RESP_ERR__INVALID_ARG:
956-
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__INVALID_ARG);
957-
return;
958-
case RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED:
959-
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED);
960-
return;
961-
case RD_KAFKA_RESP_ERR__STATE:
962-
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__STATE);
963-
return;
964-
case RD_KAFKA_RESP_ERR_NO_ERROR:
965-
break;
966-
default:
967-
return;
968-
}
909+
oauthbearer_set_token(intern->rk, token_value, lifetime_ms, principal_name, extensions_hash);
969910
}
970911
/* }}} */
971912

@@ -989,20 +930,7 @@ PHP_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure)
989930
return;
990931
}
991932

992-
rd_kafka_resp_err_t ret = rd_kafka_oauthbearer_set_token_failure(intern->rk, errstr);
993-
994-
switch (ret) {
995-
case RD_KAFKA_RESP_ERR__INVALID_ARG:
996-
zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__INVALID_ARG);
997-
return;
998-
case RD_KAFKA_RESP_ERR__STATE:
999-
zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__STATE);
1000-
return;
1001-
case RD_KAFKA_RESP_ERR_NO_ERROR:
1002-
break;
1003-
default:
1004-
return;
1005-
}
933+
oauthbearer_set_token_failure(intern->rk, errstr);
1006934
}
1007935
/* }}} */
1008936
#endif

oauthbearer.c

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
+----------------------------------------------------------------------+
3+
| php-rdkafka |
4+
+----------------------------------------------------------------------+
5+
| Copyright (c) 2025 Arnaud Le Blanc |
6+
+----------------------------------------------------------------------+
7+
| This source file is subject to version 3.01 of the PHP license, |
8+
| that is bundled with this package in the file LICENSE, and is |
9+
| available through the world-wide-web at the following url: |
10+
| http://www.php.net/license/3_01.txt |
11+
| If you did not receive a copy of the PHP license and are unable to |
12+
| obtain it through the world-wide-web, please send a note to |
13+
| license@php.net so we can mail you a copy immediately. |
14+
+----------------------------------------------------------------------+
15+
| Author: Martin Fris <rasta@lj.sk> |
16+
+----------------------------------------------------------------------+
17+
*/
18+
19+
#ifdef HAVE_CONFIG_H
20+
#include "config.h"
21+
#endif
22+
23+
#include "php.h"
24+
#include "php_rdkafka.h"
25+
#include "php_rdkafka_priv.h"
26+
#include "librdkafka/rdkafka.h"
27+
#include "Zend/zend_exceptions.h"
28+
#include "ext/spl/spl_exceptions.h"
29+
#include "conf.h"
30+
#include "topic_partition.h"
31+
#include "topic.h"
32+
#include "message.h"
33+
#include "metadata.h"
34+
#if PHP_VERSION_ID < 80000
35+
#include "kafka_consumer_legacy_arginfo.h"
36+
#else
37+
#include "kafka_consumer_arginfo.h"
38+
#endif
39+
40+
void oauthbearer_set_token(
41+
rd_kafka_t *rk,
42+
const char *token_value,
43+
zend_long lifetime_ms,
44+
const char *principal_name,
45+
const HashTable *extensions_hash
46+
) {
47+
char errstr[512];
48+
rd_kafka_resp_err_t ret = 0;
49+
50+
errstr[0] = '\0';
51+
52+
int extensions_size = 0;
53+
char **extensions = NULL;
54+
55+
if (extensions_hash != NULL) {
56+
extensions_size = zend_hash_num_elements(extensions_hash) * 2;
57+
extensions = safe_emalloc((extensions_size * 2), sizeof(char *), 0);
58+
59+
int pos = 0;
60+
zend_ulong num_key;
61+
zend_string *extension_key_str;
62+
zval *extension_zval;
63+
ZEND_HASH_FOREACH_KEY_VAL(extensions_hash, num_key, extension_key_str, extension_zval) {
64+
if (!extension_key_str) {
65+
extension_key_str = zend_long_to_str(num_key);
66+
extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str));
67+
zend_string_release(extension_key_str);
68+
} else {
69+
extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str));
70+
}
71+
72+
zend_string *tmp_extension_val_str;
73+
zend_string *extension_val_str = zval_get_tmp_string(extension_zval, &tmp_extension_val_str);
74+
extensions[pos++] = estrdup(ZSTR_VAL(extension_val_str));
75+
if (tmp_extension_val_str) {
76+
zend_string_release(tmp_extension_val_str);
77+
}
78+
} ZEND_HASH_FOREACH_END();
79+
}
80+
81+
ret = rd_kafka_oauthbearer_set_token(
82+
rk,
83+
token_value,
84+
lifetime_ms,
85+
principal_name,
86+
(const char **)extensions,
87+
extensions_size,
88+
errstr,
89+
sizeof(errstr));
90+
91+
if (extensions != NULL) {
92+
for (int i = 0; i < extensions_size; i++) {
93+
efree(extensions[i]);
94+
}
95+
efree(extensions);
96+
}
97+
98+
switch (ret) {
99+
case RD_KAFKA_RESP_ERR__INVALID_ARG:
100+
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__INVALID_ARG);
101+
return;
102+
case RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED:
103+
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED);
104+
return;
105+
case RD_KAFKA_RESP_ERR__STATE:
106+
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__STATE);
107+
return;
108+
case RD_KAFKA_RESP_ERR_NO_ERROR:
109+
break;
110+
default:
111+
return;
112+
}
113+
}
114+
115+
void oauthbearer_set_token_failure(rd_kafka_t *rk, const char *errstr) {
116+
rd_kafka_resp_err_t ret = rd_kafka_oauthbearer_set_token_failure(rk, errstr);
117+
118+
switch (ret) {
119+
case RD_KAFKA_RESP_ERR__INVALID_ARG:
120+
zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__INVALID_ARG);
121+
return;
122+
case RD_KAFKA_RESP_ERR__STATE:
123+
zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__STATE);
124+
return;
125+
case RD_KAFKA_RESP_ERR_NO_ERROR:
126+
break;
127+
default:
128+
return;
129+
}
130+
}
131+
132+
int64_t zval_to_int64(zval *zval, const char *errstr) {
133+
int64_t converted;
134+
135+
/* On 32-bits, it might be required to pass $lifetime_ms as a float or a
136+
* string */
137+
switch (Z_TYPE_P(zval)) {
138+
case IS_LONG:
139+
return (int64_t) Z_LVAL_P(zval);
140+
break;
141+
case IS_DOUBLE:
142+
return (int64_t) Z_DVAL_P(zval);
143+
break;
144+
case IS_STRING:;
145+
char *str = Z_STRVAL_P(zval);
146+
char *end;
147+
converted = (int64_t) strtoll(str, &end, 10);
148+
if (end != str + Z_STRLEN_P(zval)) {
149+
zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0, errstr);
150+
return 0;
151+
}
152+
break;
153+
EMPTY_SWITCH_DEFAULT_CASE();
154+
}
155+
}

oauthbearer.h

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
+----------------------------------------------------------------------+
3+
| php-rdkafka |
4+
+----------------------------------------------------------------------+
5+
| Copyright (c) 2025 Arnaud Le Blanc |
6+
+----------------------------------------------------------------------+
7+
| This source file is subject to version 3.01 of the PHP license, |
8+
| that is bundled with this package in the file LICENSE, and is |
9+
| available through the world-wide-web at the following url: |
10+
| http://www.php.net/license/3_01.txt |
11+
| If you did not receive a copy of the PHP license and are unable to |
12+
| obtain it through the world-wide-web, please send a note to |
13+
| license@php.net so we can mail you a copy immediately. |
14+
+----------------------------------------------------------------------+
15+
| Author: Martin Fris <rasta@lj.sk> |
16+
+----------------------------------------------------------------------+
17+
*/
18+
19+
#include "librdkafka/rdkafka.h"
20+
21+
void oauthbearer_set_token(rd_kafka_t *rk,
22+
const char *token_value,
23+
zend_long lifetime_ms,
24+
const char *principal_name,
25+
const HashTable *extensions_hash
26+
);
27+
28+
void oauthbearer_set_token_failure(rd_kafka_t *rk, const char *errstr);
29+
30+
int64_t zval_to_int64(zval *zval, const char *errstr);

0 commit comments

Comments
 (0)