Skip to content

Commit d86e221

Browse files
authored
Merge pull request #82 from s-ourabh/stickyDequeue-1-Support
Sticky dequeue 1 support
2 parents 207f8ca + dcfe305 commit d86e221

File tree

2 files changed

+10
-7
lines changed

2 files changed

+10
-7
lines changed

clients/src/main/java/org/oracle/okafka/clients/consumer/KafkaConsumer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import org.oracle.okafka.clients.CommonClientConfigs;
7979
import org.oracle.okafka.clients.Metadata;
8080
import org.oracle.okafka.clients.NetworkClient;
81+
import org.oracle.okafka.clients.TopicTeqParameters;
8182
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
8283
import org.oracle.okafka.clients.consumer.internals.ConsumerNetworkClient;
8384
import org.oracle.okafka.clients.consumer.internals.FetchMetricsRegistry;
@@ -1215,13 +1216,13 @@ record = new ConsumerRecord<>("", -1, -1, -1, TimestampType.NO_TIMESTAMP_TYPE, n
12151216
subscriptions.position(tp, new FetchPosition(record.offset(), Optional.empty(),
12161217
new LeaderAndEpoch(Optional.empty(), Optional.empty())));
12171218
} catch (IllegalStateException isE) {
1218-
if (metadata.getDBMajorVersion() < 23) {
1219+
TopicTeqParameters teqParam = metadata.topicParaMap.get(topic);
1220+
int stickyDeqParam = teqParam != null ? teqParam.getStickyDeq(): 2;
1221+
if (metadata.getDBMajorVersion() < 23 || stickyDeqParam == 1) {
12191222
// Partition assigned by TEQ Server not through JoinGroup/Sync
12201223
subscriptions.assignFromSubscribed(Collections.singleton(tp));
12211224
subscriptions.seek(tp, 0);
12221225
subscriptions.completeValidation(tp);
1223-
subscriptions.position(tp, new FetchPosition(record.offset(), Optional.empty(),
1224-
new LeaderAndEpoch(Optional.empty(), Optional.empty())));
12251226
}
12261227
subscriptions.position(tp, new FetchPosition(record.offset(), Optional.empty(),
12271228
new LeaderAndEpoch(Optional.empty(), Optional.empty())));

clients/src/main/java/org/oracle/okafka/clients/consumer/internals/ConsumerNetworkClient.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.oracle.okafka.clients.KafkaClient;
6464
import org.oracle.okafka.clients.Metadata;
6565
import org.oracle.okafka.clients.NetworkClient;
66+
import org.oracle.okafka.clients.TopicTeqParameters;
6667
import org.oracle.okafka.clients.consumer.KafkaConsumer.FetchManagerMetrics;
6768
import org.oracle.okafka.clients.consumer.internals.SubscriptionState.FetchPosition;
6869
import org.apache.kafka.clients.RequestCompletionHandler;
@@ -250,7 +251,9 @@ public void onComplete(ClientResponse response) {
250251
Node node = poll.getKey();
251252
log.debug("Fetch Records for topic " + poll.getValue() + " from host " + node );
252253
String topic = poll.getValue();
253-
if(metadata.topicParaMap.get(topic).getStickyDeq() != 2) {
254+
TopicTeqParameters teqParam = metadata.topicParaMap.get(topic);
255+
int stickyDeqParam = teqParam != null ? teqParam.getStickyDeq(): 2;
256+
if(stickyDeqParam == 0) {
254257
String errMsg = "Topic " + topic + " is not an Oracle kafka topic, Please drop and re-create topic"
255258
+" using Admin.createTopics() or dbms_aqadm.create_database_kafka_topic procedure";
256259
throw new InvalidTopicException(errMsg);
@@ -278,7 +281,7 @@ public void onComplete(ClientResponse response) {
278281

279282
return this.messages;
280283
}
281-
284+
282285
/**
283286
*
284287
* @return map of <node , topic> . Every node is leader for its corresponding topic.
@@ -397,7 +400,6 @@ private void joinGroupifNeeded(ClientResponse response, long timeoutMs) {
397400
long elapsed = response.requestLatencyMs();
398401
long prevTime = time.milliseconds();
399402
long current;
400-
401403
while(elapsed < timeoutMs && rejoinNeeded(exception)) {
402404
log.debug("JoinGroup Is Needed");
403405
if (needsJoinPrepare) {
@@ -415,6 +417,7 @@ private void joinGroupifNeeded(ClientResponse response, long timeoutMs) {
415417
elapsed = elapsed + (current - prevTime);
416418
prevTime = current;
417419
}
420+
418421
} catch(Exception e)
419422
{
420423
log.error(e.getMessage(), e);
@@ -691,7 +694,6 @@ public boolean mayBeTriggerSubcription(long timeout) {
691694

692695
if(!subscriptions.subscription().equals(subscriptionSnapshot)) {
693696
boolean noSubExist = false;
694-
rejoin = true;
695697
String topic = getSubscribableTopics();
696698
long now = time.milliseconds();
697699
Node node = client.leastLoadedNode(now);

0 commit comments

Comments
 (0)