Skip to content

Commit 600c8dc

Browse files
authored
Merge pull request #482 from alex268/update_commit_offsets2
Added support of readSessionId
2 parents e31d6b3 + c352575 commit 600c8dc

File tree

3 files changed

+22
-6
lines changed

3 files changed

+22
-6
lines changed

bom/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
<properties>
1717
<ydb-auth-api.version>1.0.0</ydb-auth-api.version>
18-
<ydb-proto-api.version>1.7.0</ydb-proto-api.version>
18+
<ydb-proto-api.version>1.7.1</ydb-proto-api.version>
1919
<yc-auth.version>2.2.0</yc-auth.version>
2020
</properties>
2121

topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -373,15 +373,19 @@ public AsyncReader createAsyncReader(ReaderSettings settings, ReadEventHandlersS
373373

374374
@Override
375375
public CompletableFuture<Status> commitOffset(String path, CommitOffsetSettings settings) {
376-
YdbTopic.CommitOffsetRequest request = YdbTopic.CommitOffsetRequest.newBuilder()
376+
YdbTopic.CommitOffsetRequest.Builder request = YdbTopic.CommitOffsetRequest.newBuilder()
377377
.setOperationParams(Operation.buildParams(settings))
378378
.setOffset(settings.getOffset())
379-
.setPath(path)
380379
.setConsumer(settings.getConsumer())
381380
.setPartitionId(settings.getPartitionId())
382-
.build();
381+
.setPath(path);
382+
383+
if (settings.getReadSessionId() != null) {
384+
request.setReadSessionId(settings.getReadSessionId());
385+
}
386+
383387
final GrpcRequestSettings grpcRequestSettings = makeGrpcRequestSettings(settings);
384-
return topicRpc.commitOffset(request, grpcRequestSettings);
388+
return topicRpc.commitOffset(request.build(), grpcRequestSettings);
385389
}
386390

387391
@Override

topic/src/main/java/tech/ydb/topic/settings/CommitOffsetSettings.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ public class CommitOffsetSettings extends OperationSettings {
99
private final long partitionId;
1010
private final String consumer;
1111
private final long offset;
12+
private final String readSessionId;
1213

1314
private CommitOffsetSettings(Builder builder) {
1415
super(builder);
1516
this.partitionId = builder.partitionId;
1617
this.consumer = builder.consumer;
1718
this.offset = builder.offset;
19+
this.readSessionId = builder.readSessionId;
1820
}
1921

2022
public static Builder newBuilder() {
@@ -33,13 +35,18 @@ public long getOffset() {
3335
return offset;
3436
}
3537

36-
/**
38+
public String getReadSessionId() {
39+
return readSessionId;
40+
}
41+
42+
/*
3743
* BUILDER
3844
*/
3945
public static class Builder extends OperationBuilder<Builder> {
4046
private long partitionId = -1;
4147
private String consumer = null;
4248
private long offset = 0;
49+
private String readSessionId = null;
4350

4451
public Builder setPartitionId(long partitionId) {
4552
this.partitionId = partitionId;
@@ -56,6 +63,11 @@ public Builder setOffset(long offset) {
5663
return this;
5764
}
5865

66+
public Builder setReadSessionId(String sessionId) {
67+
this.readSessionId = sessionId;
68+
return this;
69+
}
70+
5971
@Override
6072
public CommitOffsetSettings build() {
6173
return new CommitOffsetSettings(this);

0 commit comments

Comments
 (0)