Skip to content

Commit ad53bce

Browse files
Rastusikarnaud-lb
authored andcommitted
adding poll method to KafkaConsumer
1 parent 805aac0 commit ad53bce

File tree

4 files changed

+55
-10
lines changed

4 files changed

+55
-10
lines changed

kafka_consumer.c

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -871,6 +871,26 @@ PHP_METHOD(RdKafka_KafkaConsumer, resumePartitions)
871871
}
872872
/* }}} */
873873

874+
/* {{{ proto int RdKafka::poll(int $timeout_ms)
875+
Polls the provided kafka handle for events */
876+
PHP_METHOD(RdKafka_KafkaConsumer, poll)
877+
{
878+
object_intern *intern;
879+
zend_long timeout;
880+
881+
if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &timeout) == FAILURE) {
882+
return;
883+
}
884+
885+
intern = get_object(getThis());
886+
if (!intern) {
887+
return;
888+
}
889+
890+
RETURN_LONG(rd_kafka_poll(intern->rk, timeout));
891+
}
892+
/* }}} */
893+
874894
#ifdef HAS_RD_KAFKA_OAUTHBEARER
875895
/* {{{ proto void RdKafka\KafkaConsumer::oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = [])
876896
* Set SASL/OAUTHBEARER token and metadata

kafka_consumer.stub.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ public function pausePartitions(array $topic_partitions): array {}
8282
/** @tentative-return-type */
8383
public function resumePartitions(array $topic_partitions): array {}
8484

85+
/** @tentative-return-type */
86+
public function poll(int $timeout_ms): int {}
87+
8588
#ifdef HAS_RD_KAFKA_OAUTHBEARER
8689
/** @tentative-return-type */
8790
public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void {}

kafka_consumer_arginfo.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,14 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_queryWatermarkOffsets
129129
ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0)
130130
ZEND_END_ARG_INFO()
131131

132+
#if (PHP_VERSION_ID >= 80100)
133+
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_poll, 0, 1, IS_LONG, 0)
134+
#else
135+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_poll, 0, 0, 1)
136+
#endif
137+
ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0)
138+
ZEND_END_ARG_INFO()
139+
132140
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
133141
#if (PHP_VERSION_ID >= 80100)
134142
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 3, IS_VOID, 0)
@@ -183,6 +191,7 @@ ZEND_METHOD(RdKafka_KafkaConsumer, queryWatermarkOffsets);
183191
ZEND_METHOD(RdKafka_KafkaConsumer, offsetsForTimes);
184192
ZEND_METHOD(RdKafka_KafkaConsumer, pausePartitions);
185193
ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions);
194+
ZEND_METHOD(RdKafka_KafkaConsumer, poll);
186195
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
187196
ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken);
188197
ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure);
@@ -216,6 +225,7 @@ static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
216225
ZEND_ME(RdKafka_KafkaConsumer, offsetsForTimes, arginfo_class_RdKafka_KafkaConsumer_offsetsForTimes, ZEND_ACC_PUBLIC)
217226
ZEND_ME(RdKafka_KafkaConsumer, pausePartitions, arginfo_class_RdKafka_KafkaConsumer_pausePartitions, ZEND_ACC_PUBLIC)
218227
ZEND_ME(RdKafka_KafkaConsumer, resumePartitions, arginfo_class_RdKafka_KafkaConsumer_resumePartitions, ZEND_ACC_PUBLIC)
228+
ZEND_ME(RdKafka_KafkaConsumer, poll, arginfo_class_RdKafka_KafkaConsumer_poll, ZEND_ACC_PUBLIC)
219229
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
220230
ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetToken, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, ZEND_ACC_PUBLIC)
221231
ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC)

tests/oauthbearer_integration_hl.phpt

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,6 @@ $producer->poll(0);
6060
$topicName = sprintf("test_rdkafka_%s", uniqid());
6161
$topic = $producer->newTopic($topicName);
6262

63-
try {
64-
$producer->getMetadata(false, $topic, 10*1000);
65-
echo "Metadata retrieved successfully when refresh callback set token\n";
66-
} catch (\RdKafka\Exception $e) {
67-
echo "FAIL: Caught exception when getting metadata after successfully refreshing any token:\n";
68-
printf("%s: %s\n", get_class($e), $e->getMessage());
69-
}
70-
7163
echo "Writing test data\n";
7264
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Test");
7365
$producer->poll(0);
@@ -89,7 +81,7 @@ $confConsumer->setErrorCb(function ($producer, $err, $errstr) {
8981
printf("%s: %s\n", rd_kafka_err2str($err), $errstr);
9082
});
9183

92-
// Test that refresh token with setting token accurately will succeed when getting metadata
84+
// Test that refresh token with setting token accurately will succeed when consuming data
9385
$confConsumer->setOauthbearerTokenRefreshCb(function ($consumer) {
9486
echo "Refreshing token and succeeding\n";
9587
$token = generateJws();
@@ -118,9 +110,26 @@ $message = $consumer->consume(500);
118110
echo $message->err === -185 ? "Received empty message when reading data after not setting or refreshing any token\n" :
119111
"FAIL: Did receive a message after not setting or refreshing any token\n";
120112

113+
// Test that metadata will be loaded before data consumption, under the condition that poll is called
114+
$confConsumer->setOauthbearerTokenRefreshCb(function ($consumer) {
115+
echo "Refreshing token on poll and succeeding\n";
116+
$token = generateJws();
117+
$consumer->oauthbearerSetToken($token['value'], (string) $token['expiryMs'], $token['principal']);
118+
});
119+
$consumer = new \RdKafka\KafkaConsumer($confConsumer);
120+
$consumerTopic = $consumer->newTopic($topicName);
121+
$consumer->poll(0);
122+
123+
try {
124+
echo "Reading metadata\n";
125+
$consumer->getMetadata(false, $consumerTopic, 1000);
126+
echo "Metadata was fetched successfully after calling poll\n";
127+
} catch (\RdKafka\Exception $e) {
128+
echo "FAIL: Caught exception when getting metadata after calling poll\n";
129+
}
130+
121131
--EXPECT--
122132
Refreshing token and succeeding
123-
Metadata retrieved successfully when refresh callback set token
124133
Writing test data
125134
Write successful
126135
Reading data
@@ -131,3 +140,6 @@ Reading data
131140
Setting token failure in refresh cb
132141
Local: Authentication failure: Failed to acquire SASL OAUTHBEARER token: Token failure before data consumption
133142
Received empty message when reading data after not setting or refreshing any token
143+
Refreshing token on poll and succeeding
144+
Reading metadata
145+
Metadata was fetched successfully after calling poll

0 commit comments

Comments
 (0)