Skip to content

Commit 7a7e8e6

Browse files
qyryqgithub-actions[bot]
authored andcommitted
Topic SDK direct read (#18263)
1 parent 8da65e9 commit 7a7e8e6

File tree

17 files changed

+4103
-107
lines changed

17 files changed

+4103
-107
lines changed

.github/last_commit.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
984787760ea6d7f0bbf954dc56d979d45d3aaf61
1+
3c6c01111a028a661c25d7ad10b9c81c755f334b

include/ydb-cpp-sdk/client/topic/control_plane.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ class TPartitionConsumerStats {
128128
class TPartitionLocation {
129129
public:
130130
TPartitionLocation(const Ydb::Topic::PartitionLocation& partitionLocation);
131+
TPartitionLocation(std::int32_t nodeId, std::int64_t generation);
131132
int32_t GetNodeId() const;
132133
int64_t GetGeneration() const;
133134

include/ydb-cpp-sdk/client/topic/read_events.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
#pragma once
22

33
#include "codecs.h"
4+
#include "control_plane.h"
45
#include "events_common.h"
56

67
#include <util/datetime/base.h>
78

9+
810
namespace NYdb::inline V3::NTopic {
911

1012
//! Partition session.
@@ -42,10 +44,14 @@ struct TPartitionSession: public TThrRefBase, public TPrintable<TPartitionSessio
4244
}
4345

4446
protected:
47+
4548
uint64_t PartitionSessionId;
4649
std::string TopicPath;
4750
std::string ReadSessionId;
4851
uint64_t PartitionId;
52+
std::optional<TPartitionLocation> Location;
53+
/*TDirectReadId*/ std::int64_t NextDirectReadId = 1;
54+
std::optional</*TDirectReadId*/ std::int64_t> LastDirectReadId;
4955
};
5056

5157
template<>

include/ydb-cpp-sdk/client/topic/read_session.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,9 @@ struct TReadSessionSettings: public TRequestSettings<TReadSessionSettings> {
193193
//! AutoPartitioningSupport.
194194
FLUENT_SETTING_DEFAULT(bool, AutoPartitioningSupport, false);
195195

196+
// TODO(qyryq) Uncomment when direct read is ready.
197+
// FLUENT_SETTING_DEFAULT(bool, DirectRead, false);
198+
196199
//! Log.
197200
FLUENT_SETTING_OPTIONAL(TLog, Log);
198201
};

src/client/persqueue_public/ut/read_session_ut.cpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,6 @@ class TReadSessionImplTestSetup {
513513
std::shared_ptr<TCallbackContext<TSingleClusterReadSessionImpl>> CbContext;
514514
std::shared_ptr<TThreadPool> ThreadPool;
515515
::IExecutor::TPtr DefaultExecutor;
516-
std::shared_ptr<std::unordered_map<ECodec, THolder<NTopic::ICodec>>> ProvidedCodecs = std::make_shared<std::unordered_map<ECodec, THolder<NTopic::ICodec>>>();
517516
};
518517

519518
class TReorderingExecutor : public ::IExecutor {
@@ -588,10 +587,6 @@ TReadSessionImplTestSetup::TReadSessionImplTestSetup() {
588587
.Counters(MakeIntrusive<NYdb::NPersQueue::TReaderCounters>(MakeIntrusive<::NMonitoring::TDynamicCounters>()));
589588

590589
Log.SetFormatter(GetPrefixLogFormatter(""));
591-
592-
(*ProvidedCodecs)[ECodec::GZIP] = MakeHolder<NTopic::TGzipCodec>();
593-
(*ProvidedCodecs)[ECodec::LZOP] = MakeHolder<NTopic::TUnsupportedCodec>();
594-
(*ProvidedCodecs)[ECodec::ZSTD] = MakeHolder<NTopic::TZstdCodec>();
595590
}
596591

597592
TReadSessionImplTestSetup::~TReadSessionImplTestSetup() noexcept(false) {

src/client/persqueue_public/ut/ut_utils/ut_utils.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,22 @@ using NYdb::NTopic::IAsyncExecutor;
1414

1515
namespace NYdb::NPersQueue::NTests {
1616

17+
struct TPersQueueYdbSdkTestSetupSettings {
18+
TString TestCaseName;
19+
bool Start = true;
20+
TVector<NKikimrServices::EServiceKikimr> LogServices = ::NPersQueue::TTestServer::LOGGED_SERVICES;
21+
NActors::NLog::EPriority LogPriority = NActors::NLog::PRI_DEBUG;
22+
ui32 NodeCount = NKikimr::NPersQueueTests::PQ_DEFAULT_NODE_COUNT;
23+
size_t TopicPartitionsCount = 1;
24+
};
25+
1726
class TPersQueueYdbSdkTestSetup : public ::NPersQueue::SDKTestSetup {
1827
THolder<NYdb::TDriver> Driver;
1928
THolder<NYdb::NPersQueue::TPersQueueClient> PersQueueClient;
2029

2130
TAdaptiveLock Lock;
2231
public:
32+
// TODO(qyryq) Delete this ctor in favor of TPersQueueYdbSdkTestSetupSettings.
2333
TPersQueueYdbSdkTestSetup(const TString& testCaseName, bool start = true,
2434
const TVector<NKikimrServices::EServiceKikimr>& logServices = ::NPersQueue::TTestServer::LOGGED_SERVICES,
2535
NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG,
@@ -29,6 +39,17 @@ class TPersQueueYdbSdkTestSetup : public ::NPersQueue::SDKTestSetup {
2939
{
3040
}
3141

42+
TPersQueueYdbSdkTestSetup(TPersQueueYdbSdkTestSetupSettings settings)
43+
: SDKTestSetup(
44+
settings.TestCaseName,
45+
settings.Start,
46+
settings.LogServices,
47+
settings.LogPriority,
48+
settings.NodeCount,
49+
settings.TopicPartitionsCount)
50+
{
51+
}
52+
3253
~TPersQueueYdbSdkTestSetup() {
3354
if (PersQueueClient) {
3455
PersQueueClient = nullptr;

0 commit comments

Comments
 (0)