Skip to content

YDB C++ SDK Import 8 #398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmake/testing.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ function(add_ydb_test)
WORKING_DIRECTORY
${YDB_TEST_WORKING_DIRECTORY}
)
target_link_libraries(${YDB_TEST_NAME} PRIVATE
GTest::gtest_main
)
else()
add_yunittest(
NAME
Expand All @@ -146,6 +149,9 @@ function(add_ydb_test)
WORKING_DIRECTORY
${YDB_TEST_WORKING_DIRECTORY}
)
target_link_libraries(${YDB_TEST_NAME} PRIVATE
cpp-testing-unittest_main
)
endif()

set_yunittest_property(
Expand Down
6 changes: 5 additions & 1 deletion include/ydb-cpp-sdk/client/topic/control_plane.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class TPartitionConsumerStats {
const TInstant& GetLastReadTime() const;
const TDuration& GetMaxReadTimeLag() const;
const TDuration& GetMaxWriteTimeLag() const;
const TDuration& GetMaxCommittedTimeLag() const;

private:
uint64_t CommittedOffset_;
Expand All @@ -120,6 +121,7 @@ class TPartitionConsumerStats {
TInstant LastReadTime_;
TDuration MaxReadTimeLag_;
TDuration MaxWriteTimeLag_;
TDuration MaxCommittedTimeLag_;
};

// Topic partition location
Expand Down Expand Up @@ -766,6 +768,8 @@ struct TDescribePartitionSettings: public TOperationRequestSettings<TDescribePar
};

// Settings for commit offset request.
struct TCommitOffsetSettings : public TOperationRequestSettings<TCommitOffsetSettings> {};
struct TCommitOffsetSettings : public TOperationRequestSettings<TCommitOffsetSettings> {
FLUENT_SETTING_OPTIONAL(std::string, ReadSessionId);
};

} // namespace NYdb::NTopic
6 changes: 6 additions & 0 deletions include/ydb-cpp-sdk/client/topic/read_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ struct TPartitionSession: public TThrRefBase, public TPrintable<TPartitionSessio
return TopicPath;
}

//! Read session id.
const std::string& GetReadSessionId() const {
return ReadSessionId;
}

//! Partition id.
uint64_t GetPartitionId() const {
return PartitionId;
Expand All @@ -39,6 +44,7 @@ struct TPartitionSession: public TThrRefBase, public TPrintable<TPartitionSessio
protected:
uint64_t PartitionSessionId;
std::string TopicPath;
std::string ReadSessionId;
uint64_t PartitionId;
};

Expand Down
7 changes: 6 additions & 1 deletion src/api/protos/ydb_topic.proto
Original file line number Diff line number Diff line change
Expand Up @@ -752,9 +752,10 @@ message CommitOffsetRequest {
int64 partition_id = 3;
// Path of consumer.
string consumer = 4;

// Processed offset.
int64 offset = 5;
// Read session identifier from StreamRead RPC.
string read_session_id = 6;
}

// Commit offset response sent from server to client.
Expand Down Expand Up @@ -809,6 +810,8 @@ message Consumer {
google.protobuf.Duration max_read_time_lag = 2;
// Maximum of differences between write timestamp and create timestamp for all messages, read during last minute.
google.protobuf.Duration max_write_time_lag = 3;
// The difference between the write timestamp of the last commited message and the current time.
google.protobuf.Duration max_committed_time_lag = 5;
// Bytes read statistics.
MultipleWindowsStat bytes_read = 4;
}
Expand Down Expand Up @@ -1213,6 +1216,8 @@ message DescribeConsumerResult {
google.protobuf.Duration max_read_time_lag = 6;
// Maximum of differences between write timestamp and create timestamp for all messages, read during last minute.
google.protobuf.Duration max_write_time_lag = 7;
// The difference between the write timestamp of the last commited message and the current time.
google.protobuf.Duration max_committed_time_lag = 13;

// How much bytes were read during several windows statistics from this partition.
MultipleWindowsStat bytes_read = 8;
Expand Down
3 changes: 3 additions & 0 deletions src/client/topic/impl/read_session_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
template <bool V = UseMigrationProtocol, class = std::enable_if_t<!V>>
TPartitionStreamImpl(ui64 partitionStreamId,
std::string topicPath,
std::string readSessionId,
i64 partitionId,
i64 assignId,
i64 readOffset,
Expand All @@ -617,6 +618,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
{
TAPartitionStream<false>::PartitionSessionId = partitionStreamId;
TAPartitionStream<false>::TopicPath = std::move(topicPath);
TAPartitionStream<false>::ReadSessionId = std::move(readSessionId);
TAPartitionStream<false>::PartitionId = static_cast<ui64>(partitionId);
MaxCommittedOffset = static_cast<ui64>(readOffset);
}
Expand Down Expand Up @@ -1333,6 +1335,7 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext<TSingleClusterRe
const std::string Database;
const std::string SessionId;
const std::string ClusterName;
std::string ReadSessionId;
TLog Log;
ui64 NextPartitionStreamId;
ui64 PartitionStreamIdStep;
Expand Down
10 changes: 8 additions & 2 deletions src/client/topic/impl/read_session_impl.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id());

RetryState = nullptr;
ReadSessionId = msg.session_id();

// Successful init. Do nothing.
ContinueReadingDataImpl();
Expand Down Expand Up @@ -1222,6 +1223,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
Y_UNUSED(deferred);

RetryState = nullptr;
ReadSessionId = msg.session_id();

LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id());

Expand Down Expand Up @@ -1321,8 +1323,12 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
Y_ABORT_UNLESS(Lock.IsLocked());

auto partitionStream = MakeIntrusive<TPartitionStreamImpl<false>>(
NextPartitionStreamId, msg.partition_session().path(), msg.partition_session().partition_id(),
msg.partition_session().partition_session_id(), msg.committed_offset(),
NextPartitionStreamId,
msg.partition_session().path(),
ReadSessionId,
msg.partition_session().partition_id(),
msg.partition_session().partition_session_id(),
msg.committed_offset(),
SelfContext);
NextPartitionStreamId += PartitionStreamIdStep;

Expand Down
5 changes: 5 additions & 0 deletions src/client/topic/impl/topic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ TPartitionConsumerStats::TPartitionConsumerStats(const Ydb::Topic::DescribeConsu
, LastReadTime_(TInstant::Seconds(partitionStats.last_read_time().seconds()))
, MaxReadTimeLag_(TDuration::Seconds(partitionStats.max_read_time_lag().seconds()))
, MaxWriteTimeLag_(TDuration::Seconds(partitionStats.max_write_time_lag().seconds()))
, MaxCommittedTimeLag_(TDuration::Seconds(partitionStats.max_committed_time_lag().seconds()))
{}

uint64_t TPartitionConsumerStats::GetCommittedOffset() const {
Expand Down Expand Up @@ -406,6 +407,10 @@ const TDuration& TPartitionConsumerStats::GetMaxWriteTimeLag() const {
return MaxWriteTimeLag_;
}

const TDuration& TPartitionConsumerStats::GetMaxCommittedTimeLag() const {
return MaxCommittedTimeLag_;
}

TPartitionLocation::TPartitionLocation(const Ydb::Topic::PartitionLocation& partitionLocation)
: NodeId_(partitionLocation.node_id())
, Generation_(partitionLocation.generation())
Expand Down
4 changes: 3 additions & 1 deletion src/client/topic/impl/topic_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,9 @@ class TTopicClient::TImpl : public TClientImplCommon<TTopicClient::TImpl> {
request.set_partition_id(partitionId);
request.set_consumer(TStringType{consumerName});
request.set_offset(offset);

if (settings.ReadSessionId_) {
request.set_read_session_id(*settings.ReadSessionId_);
}
return RunSimple<Ydb::Topic::V1::TopicService, Ydb::Topic::CommitOffsetRequest, Ydb::Topic::CommitOffsetResponse>(
std::move(request),
&Ydb::Topic::V1::TopicService::Stub::AsyncCommitOffset,
Expand Down
21 changes: 21 additions & 0 deletions src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,27 @@ TTopicDescription TTopicSdkTestSetup::DescribeTopic(const TString& path)
return status.GetTopicDescription();
}

TConsumerDescription TTopicSdkTestSetup::DescribeConsumer(const TString& path, const TString& consumer)
{
TTopicClient client(MakeDriver());

TDescribeConsumerSettings settings;
settings.IncludeStats(true);
settings.IncludeLocation(true);

auto status = client.DescribeConsumer(path, consumer, settings).GetValueSync();
UNIT_ASSERT(status.IsSuccess());

return status.GetConsumerDescription();
}

TStatus TTopicSdkTestSetup::Commit(const TString& path, const TString& consumerName, size_t partitionId, size_t offset) {
TTopicClient client(MakeDriver());

return client.CommitOffset(path, partitionId, consumerName, offset).GetValueSync();
}


TString TTopicSdkTestSetup::GetEndpoint() const {
return "localhost:" + ToString(Server.GrpcPort);
}
Expand Down
3 changes: 3 additions & 0 deletions src/client/topic/ut/ut_utils/topic_sdk_test_setup.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class TTopicSdkTestSetup {
size_t maxPartitionCount = 100);

TTopicDescription DescribeTopic(const TString& path = TString{TEST_TOPIC});
TConsumerDescription DescribeConsumer(const TString& path = TString{TEST_TOPIC}, const TString& consumer = TString{TEST_CONSUMER});

TStatus Commit(const TString& path, const TString& consumerName, size_t partitionId, size_t offset);

TString GetEndpoint() const;
TString GetTopicPath(const TString& name = TString{TEST_TOPIC}) const;
Expand Down
3 changes: 1 addition & 2 deletions tests/integration/basic_example/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
add_ydb_test(NAME basic-example
add_ydb_test(NAME basic-example GTEST
SOURCES
main.cpp
basic_example_data.cpp
Expand All @@ -9,7 +9,6 @@ add_ydb_test(NAME basic-example
YDB-CPP-SDK::Driver
YDB-CPP-SDK::Proto
YDB-CPP-SDK::Table
GTest::gtest_main
LABELS
integration
)
3 changes: 1 addition & 2 deletions tests/integration/bulk_upsert/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
add_ydb_test(NAME bulk_upsert
add_ydb_test(NAME bulk_upsert GTEST
SOURCES
main.cpp
bulk_upsert.cpp
bulk_upsert.h
LINK_LIBRARIES
yutil
YDB-CPP-SDK::Table
GTest::gtest_main
LABELS
integration
)
3 changes: 1 addition & 2 deletions tests/integration/server_restart/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
add_ydb_test(NAME server_restart
add_ydb_test(NAME server_restart GTEST
SOURCES
main.cpp
LINK_LIBRARIES
yutil
api-grpc
YDB-CPP-SDK::Query
gRPC::grpc++
GTest::gtest_main
LABELS
integration
)
49 changes: 19 additions & 30 deletions tests/unit/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ add_ydb_test(NAME client-ydb_coordination_ut
coordination/coordination_ut.cpp
LINK_LIBRARIES
yutil
cpp-testing-unittest_main
YDB-CPP-SDK::Coordination
api-grpc
LABELS
Expand All @@ -15,7 +14,6 @@ add_ydb_test(NAME client-extensions-discovery_mutator_ut
discovery_mutator/discovery_mutator_ut.cpp
LINK_LIBRARIES
yutil
cpp-testing-unittest_main
YDB-CPP-SDK::DiscoveryMutator
YDB-CPP-SDK::Table
LABELS
Expand All @@ -27,7 +25,6 @@ add_ydb_test(NAME client-ydb_driver_ut
driver/driver_ut.cpp
LINK_LIBRARIES
yutil
cpp-testing-unittest_main
YDB-CPP-SDK::Driver
YDB-CPP-SDK::Table
LABELS
Expand All @@ -41,7 +38,6 @@ add_ydb_test(NAME client-impl-ydb_endpoints_ut
endpoints/endpoints_ut.cpp
LINK_LIBRARIES
yutil
cpp-testing-unittest_main
client-impl-ydb_endpoints
LABELS
unit
Expand All @@ -53,7 +49,6 @@ add_ydb_test(NAME client-oauth2_ut
oauth2_token_exchange/jwt_token_source_ut.cpp
LINK_LIBRARIES
yutil
cpp-testing-unittest_main
http-server
json
string_utils-base64
Expand All @@ -62,40 +57,34 @@ add_ydb_test(NAME client-oauth2_ut
unit
)

# add_ydb_test(NAME client-ydb_params_ut
# SOURCES
# params/params_ut.cpp
# LINK_LIBRARIES
# yutil
# cpp-testing-unittest_main
# YDB-CPP-SDK::Params
# YDB-CPP-SDK::YsonValue
# LABELS
# unit
# )
add_ydb_test(NAME client-ydb_params_ut GTEST
SOURCES
params/params_ut.cpp
LINK_LIBRARIES
yutil
YDB-CPP-SDK::Params
LABELS
unit
)

add_ydb_test(NAME client-ydb_result_ut
SOURCES
result/result_ut.cpp
LINK_LIBRARIES
yutil
cpp-testing-unittest_main
YDB-CPP-SDK::Result
YDB-CPP-SDK::Params
LABELS
unit
)

# add_ydb_test(NAME client-ydb_value_ut
# SOURCES
# value/value_ut.cpp
# LINK_LIBRARIES
# yutil
# cpp-testing-unittest_main
# YDB-CPP-SDK::Value
# YDB-CPP-SDK::JsonValue
# YDB-CPP-SDK::YsonValue
# YDB-CPP-SDK::Params
# LABELS
# unit
# )
add_ydb_test(NAME client-ydb_value_ut GTEST
SOURCES
value/value_ut.cpp
LINK_LIBRARIES
yutil
YDB-CPP-SDK::Value
YDB-CPP-SDK::Params
LABELS
unit
)
Loading
Loading