Skip to content

Commit 07d7114

Browse files
niksavelievGazizonoki
authored andcommitted
Commit for autopartitioned topics (#11629)
1 parent 342a24a commit 07d7114

File tree

6 files changed

+25
-5
lines changed

6 files changed

+25
-5
lines changed

include/ydb-cpp-sdk/client/topic/control_plane.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -766,6 +766,8 @@ struct TDescribePartitionSettings: public TOperationRequestSettings<TDescribePar
766766
};
767767

768768
// Settings for commit offset request.
769-
struct TCommitOffsetSettings : public TOperationRequestSettings<TCommitOffsetSettings> {};
769+
struct TCommitOffsetSettings : public TOperationRequestSettings<TCommitOffsetSettings> {
770+
FLUENT_SETTING_OPTIONAL(std::string, ReadSessionId);
771+
};
770772

771773
} // namespace NYdb::NTopic

include/ydb-cpp-sdk/client/topic/read_events.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ struct TPartitionSession: public TThrRefBase, public TPrintable<TPartitionSessio
3131
return TopicPath;
3232
}
3333

34+
//! Read session id.
35+
const std::string& GetReadSessionId() const {
36+
return ReadSessionId;
37+
}
38+
3439
//! Partition id.
3540
uint64_t GetPartitionId() const {
3641
return PartitionId;
@@ -39,6 +44,7 @@ struct TPartitionSession: public TThrRefBase, public TPrintable<TPartitionSessio
3944
protected:
4045
uint64_t PartitionSessionId;
4146
std::string TopicPath;
47+
std::string ReadSessionId;
4248
uint64_t PartitionId;
4349
};
4450

src/api/protos/ydb_topic.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -752,9 +752,10 @@ message CommitOffsetRequest {
752752
int64 partition_id = 3;
753753
// Path of consumer.
754754
string consumer = 4;
755-
756755
// Processed offset.
757756
int64 offset = 5;
757+
// Read session identifier from StreamRead RPC.
758+
string read_session_id = 6;
758759
}
759760

760761
// Commit offset response sent from server to client.

src/client/topic/impl/read_session_impl.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
605605
template <bool V = UseMigrationProtocol, class = std::enable_if_t<!V>>
606606
TPartitionStreamImpl(ui64 partitionStreamId,
607607
std::string topicPath,
608+
std::string readSessionId,
608609
i64 partitionId,
609610
i64 assignId,
610611
i64 readOffset,
@@ -617,6 +618,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
617618
{
618619
TAPartitionStream<false>::PartitionSessionId = partitionStreamId;
619620
TAPartitionStream<false>::TopicPath = std::move(topicPath);
621+
TAPartitionStream<false>::ReadSessionId = std::move(readSessionId);
620622
TAPartitionStream<false>::PartitionId = static_cast<ui64>(partitionId);
621623
MaxCommittedOffset = static_cast<ui64>(readOffset);
622624
}
@@ -1333,6 +1335,7 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext<TSingleClusterRe
13331335
const std::string Database;
13341336
const std::string SessionId;
13351337
const std::string ClusterName;
1338+
std::string ReadSessionId;
13361339
TLog Log;
13371340
ui64 NextPartitionStreamId;
13381341
ui64 PartitionStreamIdStep;

src/client/topic/impl/read_session_impl.ipp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -971,6 +971,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
971971
LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id());
972972

973973
RetryState = nullptr;
974+
ReadSessionId = msg.session_id();
974975

975976
// Successful init. Do nothing.
976977
ContinueReadingDataImpl();
@@ -1222,6 +1223,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
12221223
Y_UNUSED(deferred);
12231224

12241225
RetryState = nullptr;
1226+
ReadSessionId = msg.session_id();
12251227

12261228
LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id());
12271229

@@ -1321,8 +1323,12 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
13211323
Y_ABORT_UNLESS(Lock.IsLocked());
13221324

13231325
auto partitionStream = MakeIntrusive<TPartitionStreamImpl<false>>(
1324-
NextPartitionStreamId, msg.partition_session().path(), msg.partition_session().partition_id(),
1325-
msg.partition_session().partition_session_id(), msg.committed_offset(),
1326+
NextPartitionStreamId,
1327+
msg.partition_session().path(),
1328+
ReadSessionId,
1329+
msg.partition_session().partition_id(),
1330+
msg.partition_session().partition_session_id(),
1331+
msg.committed_offset(),
13261332
SelfContext);
13271333
NextPartitionStreamId += PartitionStreamIdStep;
13281334

src/client/topic/impl/topic_impl.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,9 @@ class TTopicClient::TImpl : public TClientImplCommon<TTopicClient::TImpl> {
282282
request.set_partition_id(partitionId);
283283
request.set_consumer(TStringType{consumerName});
284284
request.set_offset(offset);
285-
285+
if (settings.ReadSessionId_) {
286+
request.set_read_session_id(*settings.ReadSessionId_);
287+
}
286288
return RunSimple<Ydb::Topic::V1::TopicService, Ydb::Topic::CommitOffsetRequest, Ydb::Topic::CommitOffsetResponse>(
287289
std::move(request),
288290
&Ydb::Topic::V1::TopicService::Stub::AsyncCommitOffset,

0 commit comments

Comments
 (0)