Skip to content

SASL_SSL OAUTHBEARER support for high level consumer #581

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: 6.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ if test "$PHP_RDKAFKA" != "no"; then

AC_CHECK_LIB($LIBNAME,[rd_kafka_oauthbearer_set_token],[
AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER,1,[ ])
SOURCES="$SOURCES oauthbearer.c"
],[
AC_MSG_WARN([oauthbearer support is not available])
])
Expand Down
87 changes: 87 additions & 0 deletions kafka_consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
#include "topic.h"
#include "message.h"
#include "metadata.h"
#ifdef HAS_RD_KAFKA_OAUTHBEARER
#include "oauthbearer.h"
#endif
#if PHP_VERSION_ID < 80000
#include "kafka_consumer_legacy_arginfo.h"
#else
Expand Down Expand Up @@ -868,6 +871,90 @@ PHP_METHOD(RdKafka_KafkaConsumer, resumePartitions)
}
/* }}} */

/* {{{ proto int RdKafka::poll(int $timeout_ms)
Polls the provided kafka handle for events */
PHP_METHOD(RdKafka_KafkaConsumer, poll)
{
object_intern *intern;
zend_long timeout;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &timeout) == FAILURE) {
return;
}

intern = get_object(getThis());
if (!intern) {
return;
}

RETURN_LONG(rd_kafka_poll(intern->rk, timeout));
}
/* }}} */

#ifdef HAS_RD_KAFKA_OAUTHBEARER
/* {{{ proto void RdKafka\KafkaConsumer::oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = [])
* Set SASL/OAUTHBEARER token and metadata
*
* The SASL/OAUTHBEARER token refresh callback or event handler should cause
* this method to be invoked upon success, via
* $consumer->oauthbearerSetToken(). The extension keys must not include the
* reserved key "`auth`", and all extension keys and values must conform to the
* required format as per https://tools.ietf.org/html/rfc7628#section-3.1:
*
* key = 1*(ALPHA)
* value = *(VCHAR / SP / HTAB / CR / LF )
*/
PHP_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken)
{
object_intern *intern;
char *token_value;
size_t token_value_len;
zval *zlifetime_ms;
int64_t lifetime_ms;
char *principal_name;
size_t principal_len;
HashTable *extensions_hash = NULL;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "szs|h", &token_value, &token_value_len, &zlifetime_ms, &principal_name, &principal_len, &extensions_hash) == FAILURE) {
return;
}

lifetime_ms = zval_to_int64(zlifetime_ms, "Argument #2 ($lifetime_ms) must be a valid integer");

intern = get_object(getThis());
if (!intern) {
return;
}

oauthbearer_set_token(intern->rk, token_value, lifetime_ms, principal_name, extensions_hash);
}
/* }}} */

/* {{{ proto void RdKafka::oauthbearerSetTokenFailure(string $error)
The SASL/OAUTHBEARER token refresh callback or event handler should cause
this method to be invoked upon failure, via
rd_kafka_oauthbearer_set_token_failure().
*/
PHP_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure)
{
object_intern *intern;
const char *errstr;
size_t errstr_len;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "s", &errstr, &errstr_len) == FAILURE) {
return;
}

intern = get_object(getThis());
if (!intern) {
return;
}

oauthbearer_set_token_failure(intern->rk, errstr);
}
/* }}} */
#endif

void kafka_kafka_consumer_minit(INIT_FUNC_ARGS) /* {{{ */
{
ce = register_class_RdKafka_KafkaConsumer();
Expand Down
11 changes: 11 additions & 0 deletions kafka_consumer.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,15 @@ public function pausePartitions(array $topic_partitions): array {}

/** @tentative-return-type */
public function resumePartitions(array $topic_partitions): array {}

/** @tentative-return-type */
public function poll(int $timeout_ms): int {}

#ifdef HAS_RD_KAFKA_OAUTHBEARER
/** @tentative-return-type */
public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void {}

/** @tentative-return-type */
public function oauthbearerSetTokenFailure(string $error): void {}
#endif
}
39 changes: 39 additions & 0 deletions kafka_consumer_arginfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,35 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_queryWatermarkOffsets
ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0)
ZEND_END_ARG_INFO()

#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_poll, 0, 1, IS_LONG, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_poll, 0, 0, 1)
#endif
ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0)
ZEND_END_ARG_INFO()

#if defined(HAS_RD_KAFKA_OAUTHBEARER)
#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 3, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 0, 3)
#endif
ZEND_ARG_TYPE_INFO(0, token_value, IS_STRING, 0)
ZEND_ARG_TYPE_INFO(0, lifetime_ms, IS_LONG, 0)
ZEND_ARG_TYPE_INFO(0, principal_name, IS_STRING, 0)
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, extensions, IS_ARRAY, 0, "[]")
ZEND_END_ARG_INFO()

#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 1, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 0, 1)
#endif
ZEND_ARG_TYPE_INFO(0, error, IS_STRING, 0)
ZEND_END_ARG_INFO()
#endif

#define arginfo_class_RdKafka_KafkaConsumer_offsetsForTimes arginfo_class_RdKafka_KafkaConsumer_getCommittedOffsets

#define arginfo_class_RdKafka_KafkaConsumer_pausePartitions arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions
Expand Down Expand Up @@ -162,6 +191,11 @@ ZEND_METHOD(RdKafka_KafkaConsumer, queryWatermarkOffsets);
ZEND_METHOD(RdKafka_KafkaConsumer, offsetsForTimes);
ZEND_METHOD(RdKafka_KafkaConsumer, pausePartitions);
ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions);
ZEND_METHOD(RdKafka_KafkaConsumer, poll);
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken);
ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure);
#endif

static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC)
Expand Down Expand Up @@ -191,6 +225,11 @@ static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
ZEND_ME(RdKafka_KafkaConsumer, offsetsForTimes, arginfo_class_RdKafka_KafkaConsumer_offsetsForTimes, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, pausePartitions, arginfo_class_RdKafka_KafkaConsumer_pausePartitions, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, resumePartitions, arginfo_class_RdKafka_KafkaConsumer_resumePartitions, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, poll, arginfo_class_RdKafka_KafkaConsumer_poll, ZEND_ACC_PUBLIC)
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetToken, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC)
#endif
ZEND_FE_END
};

Expand Down
155 changes: 155 additions & 0 deletions oauthbearer.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
+----------------------------------------------------------------------+
| php-rdkafka |
+----------------------------------------------------------------------+
| Copyright (c) 2025 Arnaud Le Blanc |
+----------------------------------------------------------------------+
| This source file is subject to version 3.01 of the PHP license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.php.net/license/3_01.txt |
| If you did not receive a copy of the PHP license and are unable to |
| obtain it through the world-wide-web, please send a note to |
| license@php.net so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
| Author: Martin Fris <rasta@lj.sk> |
+----------------------------------------------------------------------+
*/

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include "php.h"
#include "php_rdkafka.h"
#include "php_rdkafka_priv.h"
#include "librdkafka/rdkafka.h"
#include "Zend/zend_exceptions.h"
#include "ext/spl/spl_exceptions.h"
#include "conf.h"
#include "topic_partition.h"
#include "topic.h"
#include "message.h"
#include "metadata.h"
#if PHP_VERSION_ID < 80000
#include "kafka_consumer_legacy_arginfo.h"
#else
#include "kafka_consumer_arginfo.h"
#endif

void oauthbearer_set_token(
rd_kafka_t *rk,
const char *token_value,
zend_long lifetime_ms,
const char *principal_name,
const HashTable *extensions_hash
) {
char errstr[512];
rd_kafka_resp_err_t ret = 0;

errstr[0] = '\0';

int extensions_size = 0;
char **extensions = NULL;

if (extensions_hash != NULL) {
extensions_size = zend_hash_num_elements(extensions_hash) * 2;
extensions = safe_emalloc((extensions_size * 2), sizeof(char *), 0);

int pos = 0;
zend_ulong num_key;
zend_string *extension_key_str;
zval *extension_zval;
ZEND_HASH_FOREACH_KEY_VAL(extensions_hash, num_key, extension_key_str, extension_zval) {
if (!extension_key_str) {
extension_key_str = zend_long_to_str(num_key);
extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str));
zend_string_release(extension_key_str);
} else {
extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str));
}

zend_string *tmp_extension_val_str;
zend_string *extension_val_str = zval_get_tmp_string(extension_zval, &tmp_extension_val_str);
extensions[pos++] = estrdup(ZSTR_VAL(extension_val_str));
if (tmp_extension_val_str) {
zend_string_release(tmp_extension_val_str);
}
} ZEND_HASH_FOREACH_END();
}

ret = rd_kafka_oauthbearer_set_token(
rk,
token_value,
lifetime_ms,
principal_name,
(const char **)extensions,
extensions_size,
errstr,
sizeof(errstr));

if (extensions != NULL) {
for (int i = 0; i < extensions_size; i++) {
efree(extensions[i]);
}
efree(extensions);
}

switch (ret) {
case RD_KAFKA_RESP_ERR__INVALID_ARG:
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__INVALID_ARG);
return;
case RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED:
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED);
return;
case RD_KAFKA_RESP_ERR__STATE:
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__STATE);
return;
case RD_KAFKA_RESP_ERR_NO_ERROR:
break;
default:
return;
}
}

void oauthbearer_set_token_failure(rd_kafka_t *rk, const char *errstr) {
rd_kafka_resp_err_t ret = rd_kafka_oauthbearer_set_token_failure(rk, errstr);

switch (ret) {
case RD_KAFKA_RESP_ERR__INVALID_ARG:
zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__INVALID_ARG);
return;
case RD_KAFKA_RESP_ERR__STATE:
zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__STATE);
return;
case RD_KAFKA_RESP_ERR_NO_ERROR:
break;
default:
return;
}
}

int64_t zval_to_int64(zval *zval, const char *errstr) {
int64_t converted;

/* On 32-bits, it might be required to pass $lifetime_ms as a float or a
* string */
switch (Z_TYPE_P(zval)) {
case IS_LONG:
return (int64_t) Z_LVAL_P(zval);
break;
case IS_DOUBLE:
return (int64_t) Z_DVAL_P(zval);
break;
case IS_STRING:;
char *str = Z_STRVAL_P(zval);
char *end;
converted = (int64_t) strtoll(str, &end, 10);
if (end != str + Z_STRLEN_P(zval)) {
zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0, errstr);
return 0;
}
break;
EMPTY_SWITCH_DEFAULT_CASE();
}
}
30 changes: 30 additions & 0 deletions oauthbearer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
+----------------------------------------------------------------------+
| php-rdkafka |
+----------------------------------------------------------------------+
| Copyright (c) 2025 Arnaud Le Blanc |
+----------------------------------------------------------------------+
| This source file is subject to version 3.01 of the PHP license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.php.net/license/3_01.txt |
| If you did not receive a copy of the PHP license and are unable to |
| obtain it through the world-wide-web, please send a note to |
| license@php.net so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
| Author: Martin Fris <rasta@lj.sk> |
+----------------------------------------------------------------------+
*/

#include "librdkafka/rdkafka.h"

void oauthbearer_set_token(rd_kafka_t *rk,
const char *token_value,
zend_long lifetime_ms,
const char *principal_name,
const HashTable *extensions_hash
);

void oauthbearer_set_token_failure(rd_kafka_t *rk, const char *errstr);

int64_t zval_to_int64(zval *zval, const char *errstr);
3 changes: 3 additions & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@
<file role="src" name="metadata_topic_legacy_arginfo.h"/>
<file role="src" name="php_rdkafka.h"/>
<file role="src" name="php_rdkafka_priv.h"/>
<file role="src" name="oauthbearer.c"/>
<file role="src" name="oauthbearer.h"/>
<file role="src" name="queue.c"/>
<file role="src" name="queue.h"/>
<file role="src" name="queue.stub.php"/>
Expand Down Expand Up @@ -136,6 +138,7 @@
<file role="test" name="metadata_topic_001.phpt"/>
<file role="test" name="new_topic_with_conf.phpt"/>
<file role="test" name="oauthbearer_integration.phpt"/>
<file role="test" name="oauthbearer_integration_hl.phpt"/>
<file role="test" name="pause_resume.phpt"/>
<file role="test" name="produce_consume.phpt"/>
<file role="test" name="produce_consume_queue.phpt"/>
Expand Down
Loading
Loading