Skip to content

Commit bd170a8

Browse files
ildar-khisambeevGazizonoki
authored andcommitted
Moved "refactor persqueue sdk" commit from ydb repo
1 parent f38e36b commit bd170a8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+2948
-2969
lines changed

src/client/topic/codecs/codecs.h renamed to include/ydb-cpp-sdk/client/topic/codecs/codecs.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,14 @@
1313

1414
namespace NYdb::NTopic {
1515

16+
enum class ECodec : ui32 {
17+
RAW = 1,
18+
GZIP = 2,
19+
LZOP = 3,
20+
ZSTD = 4,
21+
CUSTOM = 10000,
22+
};
23+
1624
class ICodec {
1725
public:
1826
virtual ~ICodec() = default;
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
#pragma once
2+
3+
#include <ydb-cpp-sdk/library/monlib/dynamic_counters/counters.h>
4+
5+
namespace NYdb::NTopic {
6+
7+
using TCounterPtr = ::NMonitoring::TDynamicCounters::TCounterPtr;
8+
9+
struct TWriterCounters : public TThrRefBase {
10+
using TSelf = TWriterCounters;
11+
using TPtr = TIntrusivePtr<TSelf>;
12+
13+
explicit TWriterCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters) {
14+
Errors = counters->GetCounter("errors", true);
15+
CurrentSessionLifetimeMs = counters->GetCounter("currentSessionLifetimeMs", false);
16+
BytesWritten = counters->GetCounter("bytesWritten", true);
17+
MessagesWritten = counters->GetCounter("messagesWritten", true);
18+
BytesWrittenCompressed = counters->GetCounter("bytesWrittenCompressed", true);
19+
BytesInflightUncompressed = counters->GetCounter("bytesInflightUncompressed", false);
20+
BytesInflightCompressed = counters->GetCounter("bytesInflightCompressed", false);
21+
BytesInflightTotal = counters->GetCounter("bytesInflightTotal", false);
22+
MessagesInflight = counters->GetCounter("messagesInflight", false);
23+
24+
#define TOPIC_COUNTERS_HISTOGRAM_SETUP ::NMonitoring::ExplicitHistogram({0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100})
25+
TotalBytesInflightUsageByTime = counters->GetHistogram("totalBytesInflightUsageByTime", TOPIC_COUNTERS_HISTOGRAM_SETUP);
26+
UncompressedBytesInflightUsageByTime = counters->GetHistogram("uncompressedBytesInflightUsageByTime", TOPIC_COUNTERS_HISTOGRAM_SETUP);
27+
CompressedBytesInflightUsageByTime = counters->GetHistogram("compressedBytesInflightUsageByTime", TOPIC_COUNTERS_HISTOGRAM_SETUP);
28+
#undef TOPIC_COUNTERS_HISTOGRAM_SETUP
29+
}
30+
31+
TCounterPtr Errors;
32+
TCounterPtr CurrentSessionLifetimeMs;
33+
34+
TCounterPtr BytesWritten;
35+
TCounterPtr MessagesWritten;
36+
TCounterPtr BytesWrittenCompressed;
37+
38+
TCounterPtr BytesInflightUncompressed;
39+
TCounterPtr BytesInflightCompressed;
40+
TCounterPtr BytesInflightTotal;
41+
TCounterPtr MessagesInflight;
42+
43+
//! Histograms reporting % usage of memory limit in time.
44+
//! Provides a histogram looking like: 10% : 100ms, 20%: 300ms, ... 50%: 200ms, ... 100%: 50ms
45+
//! Which means that < 10% memory usage was observed for 100ms during the period and 50% usage was observed for 200ms
46+
//! Used to monitor if the writer successfully deals with data flow provided. Larger values in higher buckets
47+
//! mean that writer is close to overflow (or being overflown) for major periods of time
48+
//! 3 histograms stand for:
49+
//! Total memory usage:
50+
::NMonitoring::THistogramPtr TotalBytesInflightUsageByTime;
51+
//! Memory usage by messages waiting for comression:
52+
::NMonitoring::THistogramPtr UncompressedBytesInflightUsageByTime;
53+
//! Memory usage by compressed messages pending for write:
54+
::NMonitoring::THistogramPtr CompressedBytesInflightUsageByTime;
55+
};
56+
57+
struct TReaderCounters: public TThrRefBase {
58+
using TSelf = TReaderCounters;
59+
using TPtr = TIntrusivePtr<TSelf>;
60+
61+
TReaderCounters() = default;
62+
explicit TReaderCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters) {
63+
Errors = counters->GetCounter("errors", true);
64+
CurrentSessionLifetimeMs = counters->GetCounter("currentSessionLifetimeMs", false);
65+
BytesRead = counters->GetCounter("bytesRead", true);
66+
MessagesRead = counters->GetCounter("messagesRead", true);
67+
BytesReadCompressed = counters->GetCounter("bytesReadCompressed", true);
68+
BytesInflightUncompressed = counters->GetCounter("bytesInflightUncompressed", false);
69+
BytesInflightCompressed = counters->GetCounter("bytesInflightCompressed", false);
70+
BytesInflightTotal = counters->GetCounter("bytesInflightTotal", false);
71+
MessagesInflight = counters->GetCounter("messagesInflight", false);
72+
73+
#define TOPIC_COUNTERS_HISTOGRAM_SETUP ::NMonitoring::ExplicitHistogram({0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100})
74+
TotalBytesInflightUsageByTime = counters->GetHistogram("totalBytesInflightUsageByTime", TOPIC_COUNTERS_HISTOGRAM_SETUP);
75+
UncompressedBytesInflightUsageByTime = counters->GetHistogram("uncompressedBytesInflightUsageByTime", TOPIC_COUNTERS_HISTOGRAM_SETUP);
76+
CompressedBytesInflightUsageByTime = counters->GetHistogram("compressedBytesInflightUsageByTime", TOPIC_COUNTERS_HISTOGRAM_SETUP);
77+
#undef TOPIC_COUNTERS_HISTOGRAM_SETUP
78+
}
79+
80+
TCounterPtr Errors;
81+
TCounterPtr CurrentSessionLifetimeMs;
82+
83+
TCounterPtr BytesRead;
84+
TCounterPtr MessagesRead;
85+
TCounterPtr BytesReadCompressed;
86+
87+
TCounterPtr BytesInflightUncompressed;
88+
TCounterPtr BytesInflightCompressed;
89+
TCounterPtr BytesInflightTotal;
90+
TCounterPtr MessagesInflight;
91+
92+
//! Histograms reporting % usage of memory limit in time.
93+
//! Provides a histogram looking like: 10% : 100ms, 20%: 300ms, ... 50%: 200ms, ... 100%: 50ms
94+
//! Which means < 10% memory usage was observed for 100ms during the period and 50% usage was observed for 200ms.
95+
//! Used to monitor if the read session successfully deals with data flow provided. Larger values in higher buckets
96+
//! mean that read session is close to overflow (or being overflown) for major periods of time.
97+
//!
98+
//! Total memory usage.
99+
::NMonitoring::THistogramPtr TotalBytesInflightUsageByTime;
100+
//! Memory usage by messages waiting that are ready to be received by user.
101+
::NMonitoring::THistogramPtr UncompressedBytesInflightUsageByTime;
102+
//! Memory usage by compressed messages pending for decompression.
103+
::NMonitoring::THistogramPtr CompressedBytesInflightUsageByTime;
104+
};
105+
106+
} // namespace NYdb::NTopic
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#pragma once
2+
3+
#include <ydb-cpp-sdk/util/generic/ptr.h>
4+
#include <ydb-cpp-sdk/util/system/spinlock.h>
5+
#include <ydb-cpp-sdk/util/thread/pool.h>
6+
7+
#include <memory>
8+
9+
namespace NYdb::NTopic {
10+
11+
class IExecutor: public TThrRefBase {
12+
public:
13+
using TPtr = TIntrusivePtr<IExecutor>;
14+
using TFunction = std::function<void()>;
15+
16+
// Is executor asynchronous.
17+
virtual bool IsAsync() const = 0;
18+
19+
// Post function to execute.
20+
virtual void Post(TFunction&& f) = 0;
21+
22+
// Start method.
23+
// This method is idempotent.
24+
// It can be called many times. Only the first one has effect.
25+
void Start() {
26+
std::lock_guard guard(StartLock);
27+
if (!Started) {
28+
DoStart();
29+
Started = true;
30+
}
31+
}
32+
33+
private:
34+
virtual void DoStart() = 0;
35+
36+
private:
37+
bool Started = false;
38+
TAdaptiveLock StartLock;
39+
};
40+
IExecutor::TPtr CreateThreadPoolExecutorAdapter(
41+
std::shared_ptr<IThreadPool> threadPool); // Thread pool is expected to have been started.
42+
IExecutor::TPtr CreateThreadPoolExecutor(size_t threads);
43+
44+
IExecutor::TPtr CreateSyncExecutor();
45+
46+
} // namespace NYdb::NTopic
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#pragma once
2+
3+
#include <ydb-cpp-sdk/client/types/status_codes.h>
4+
#include <ydb-cpp-sdk/library/retry/retry_policy.h>
5+
#include <ydb-cpp-sdk/util/generic/ptr.h>
6+
7+
namespace NYdb::NTopic {
8+
9+
//! Retry policy.
10+
//! Calculates delay before next retry.
11+
//! Has several default implementations:
12+
//! - exponential backoff policy;
13+
//! - retries with fixed interval;
14+
//! - no retries.
15+
16+
struct IRetryPolicy: ::IRetryPolicy<EStatus> {
17+
//!
18+
//! Default implementations.
19+
//!
20+
21+
static TPtr GetDefaultPolicy(); // Exponential backoff with infinite retry attempts.
22+
static TPtr GetNoRetryPolicy(); // Denies all kind of retries.
23+
24+
//! Randomized exponential backoff policy.
25+
static TPtr GetExponentialBackoffPolicy(
26+
TDuration minDelay = TDuration::MilliSeconds(10),
27+
// Delay for statuses that require waiting before retry (such as OVERLOADED).
28+
TDuration minLongRetryDelay = TDuration::MilliSeconds(200), TDuration maxDelay = TDuration::Seconds(30),
29+
size_t maxRetries = std::numeric_limits<size_t>::max(), TDuration maxTime = TDuration::Max(),
30+
double scaleFactor = 2.0, std::function<ERetryErrorClass(EStatus)> customRetryClassFunction = {});
31+
32+
//! Randomized fixed interval policy.
33+
static TPtr GetFixedIntervalPolicy(TDuration delay = TDuration::MilliSeconds(100),
34+
// Delay for statuses that require waiting before retry (such as OVERLOADED).
35+
TDuration longRetryDelay = TDuration::MilliSeconds(300),
36+
size_t maxRetries = std::numeric_limits<size_t>::max(),
37+
TDuration maxTime = TDuration::Max(),
38+
std::function<ERetryErrorClass(EStatus)> customRetryClassFunction = {});
39+
};
40+
41+
} // namespace NYdb::NTopic

include/ydb-cpp-sdk/client/topic/topic.h

Lines changed: 7 additions & 144 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
#pragma once
22

3+
#include "codecs/codecs.h"
4+
#include "common/counters.h"
5+
#include "common/executor.h"
6+
#include "common/retry_policy.h"
7+
38
#include <src/api/grpc/ydb_topic_v1.grpc.pb.h>
9+
410
#include <ydb-cpp-sdk/client/driver/driver.h>
511
#include <ydb-cpp-sdk/client/scheme/scheme.h>
6-
#include <src/client/topic/codecs/codecs.h>
712
#include <ydb-cpp-sdk/client/types/exceptions/exceptions.h>
813

9-
#include <ydb-cpp-sdk/library/monlib/dynamic_counters/counters.h>
1014
#include <ydb-cpp-sdk/library/logger/log.h>
11-
#include <ydb-cpp-sdk/library/retry/retry_policy.h>
12-
#include <ydb-cpp-sdk/util/string/builder.h>
1315

16+
#include <ydb-cpp-sdk/util/string/builder.h>
1417
#include <ydb-cpp-sdk/util/datetime/base.h>
1518
#include <ydb-cpp-sdk/util/generic/size_literals.h>
1619
#include <ydb-cpp-sdk/util/thread/pool.h>
@@ -32,14 +35,6 @@ namespace NYdb {
3235

3336
namespace NYdb::NTopic {
3437

35-
enum class ECodec : ui32 {
36-
RAW = 1,
37-
GZIP = 2,
38-
LZOP = 3,
39-
ZSTD = 4,
40-
CUSTOM = 10000,
41-
};
42-
4338
enum class EMeteringMode : ui32 {
4439
Unspecified = 0,
4540
ReservedCapacity = 1,
@@ -684,71 +679,6 @@ class TContinuationTokenIssuer {
684679
}
685680
};
686681

687-
struct TWriterCounters : public TThrRefBase {
688-
using TSelf = TWriterCounters;
689-
using TPtr = TIntrusivePtr<TSelf>;
690-
691-
explicit TWriterCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters);
692-
693-
::NMonitoring::TDynamicCounters::TCounterPtr Errors;
694-
::NMonitoring::TDynamicCounters::TCounterPtr CurrentSessionLifetimeMs;
695-
696-
::NMonitoring::TDynamicCounters::TCounterPtr BytesWritten;
697-
::NMonitoring::TDynamicCounters::TCounterPtr MessagesWritten;
698-
::NMonitoring::TDynamicCounters::TCounterPtr BytesWrittenCompressed;
699-
700-
::NMonitoring::TDynamicCounters::TCounterPtr BytesInflightUncompressed;
701-
::NMonitoring::TDynamicCounters::TCounterPtr BytesInflightCompressed;
702-
::NMonitoring::TDynamicCounters::TCounterPtr BytesInflightTotal;
703-
::NMonitoring::TDynamicCounters::TCounterPtr MessagesInflight;
704-
705-
//! Histograms reporting % usage of memory limit in time.
706-
//! Provides a histogram looking like: 10% : 100ms, 20%: 300ms, ... 50%: 200ms, ... 100%: 50ms
707-
//! Which means that < 10% memory usage was observed for 100ms during the period and 50% usage was observed for 200ms
708-
//! Used to monitor if the writer successfully deals with data flow provided. Larger values in higher buckets
709-
//! mean that writer is close to overflow (or being overflown) for major periods of time
710-
//! 3 histograms stand for:
711-
//! Total memory usage:
712-
::NMonitoring::THistogramPtr TotalBytesInflightUsageByTime;
713-
//! Memory usage by messages waiting for comression:
714-
::NMonitoring::THistogramPtr UncompressedBytesInflightUsageByTime;
715-
//! Memory usage by compressed messages pending for write:
716-
::NMonitoring::THistogramPtr CompressedBytesInflightUsageByTime;
717-
};
718-
719-
struct TReaderCounters: public TThrRefBase {
720-
using TSelf = TReaderCounters;
721-
using TPtr = TIntrusivePtr<TSelf>;
722-
723-
TReaderCounters() = default;
724-
explicit TReaderCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters);
725-
726-
::NMonitoring::TDynamicCounters::TCounterPtr Errors;
727-
::NMonitoring::TDynamicCounters::TCounterPtr CurrentSessionLifetimeMs;
728-
729-
::NMonitoring::TDynamicCounters::TCounterPtr BytesRead;
730-
::NMonitoring::TDynamicCounters::TCounterPtr MessagesRead;
731-
::NMonitoring::TDynamicCounters::TCounterPtr BytesReadCompressed;
732-
733-
::NMonitoring::TDynamicCounters::TCounterPtr BytesInflightUncompressed;
734-
::NMonitoring::TDynamicCounters::TCounterPtr BytesInflightCompressed;
735-
::NMonitoring::TDynamicCounters::TCounterPtr BytesInflightTotal;
736-
::NMonitoring::TDynamicCounters::TCounterPtr MessagesInflight;
737-
738-
//! Histograms reporting % usage of memory limit in time.
739-
//! Provides a histogram looking like: 10% : 100ms, 20%: 300ms, ... 50%: 200ms, ... 100%: 50ms
740-
//! Which means < 10% memory usage was observed for 100ms during the period and 50% usage was observed for 200ms.
741-
//! Used to monitor if the read session successfully deals with data flow provided. Larger values in higher buckets
742-
//! mean that read session is close to overflow (or being overflown) for major periods of time.
743-
//!
744-
//! Total memory usage.
745-
::NMonitoring::THistogramPtr TotalBytesInflightUsageByTime;
746-
//! Memory usage by messages waiting that are ready to be received by user.
747-
::NMonitoring::THistogramPtr UncompressedBytesInflightUsageByTime;
748-
//! Memory usage by compressed messages pending for decompression.
749-
::NMonitoring::THistogramPtr CompressedBytesInflightUsageByTime;
750-
};
751-
752682
//! Partition session.
753683
struct TPartitionSession: public TThrRefBase, public TPrintable<TPartitionSession> {
754684
using TPtr = TIntrusivePtr<TPartitionSession>;
@@ -1147,73 +1077,6 @@ void TPrintable<TSessionClosedEvent>::DebugString(TStringBuilder& ret, bool prin
11471077

11481078
std::string DebugString(const TReadSessionEvent::TEvent& event);
11491079

1150-
//! Retry policy.
1151-
//! Calculates delay before next retry.
1152-
//! Has several default implementations:
1153-
//! - exponential backoff policy;
1154-
//! - retries with fixed interval;
1155-
//! - no retries.
1156-
1157-
struct IRetryPolicy: ::IRetryPolicy<EStatus> {
1158-
//!
1159-
//! Default implementations.
1160-
//!
1161-
1162-
static TPtr GetDefaultPolicy(); // Exponential backoff with infinite retry attempts.
1163-
static TPtr GetNoRetryPolicy(); // Denies all kind of retries.
1164-
1165-
//! Randomized exponential backoff policy.
1166-
static TPtr GetExponentialBackoffPolicy(
1167-
TDuration minDelay = TDuration::MilliSeconds(10),
1168-
// Delay for statuses that require waiting before retry (such as OVERLOADED).
1169-
TDuration minLongRetryDelay = TDuration::MilliSeconds(200), TDuration maxDelay = TDuration::Seconds(30),
1170-
size_t maxRetries = std::numeric_limits<size_t>::max(), TDuration maxTime = TDuration::Max(),
1171-
double scaleFactor = 2.0, std::function<ERetryErrorClass(EStatus)> customRetryClassFunction = {});
1172-
1173-
//! Randomized fixed interval policy.
1174-
static TPtr GetFixedIntervalPolicy(TDuration delay = TDuration::MilliSeconds(100),
1175-
// Delay for statuses that require waiting before retry (such as OVERLOADED).
1176-
TDuration longRetryDelay = TDuration::MilliSeconds(300),
1177-
size_t maxRetries = std::numeric_limits<size_t>::max(),
1178-
TDuration maxTime = TDuration::Max(),
1179-
std::function<ERetryErrorClass(EStatus)> customRetryClassFunction = {});
1180-
};
1181-
1182-
class IExecutor: public TThrRefBase {
1183-
public:
1184-
using TPtr = TIntrusivePtr<IExecutor>;
1185-
using TFunction = std::function<void()>;
1186-
1187-
// Is executor asynchronous.
1188-
virtual bool IsAsync() const = 0;
1189-
1190-
// Post function to execute.
1191-
virtual void Post(TFunction&& f) = 0;
1192-
1193-
// Start method.
1194-
// This method is idempotent.
1195-
// It can be called many times. Only the first one has effect.
1196-
void Start() {
1197-
std::lock_guard guard(StartLock);
1198-
if (!Started) {
1199-
DoStart();
1200-
Started = true;
1201-
}
1202-
}
1203-
1204-
private:
1205-
virtual void DoStart() = 0;
1206-
1207-
private:
1208-
bool Started = false;
1209-
TAdaptiveLock StartLock;
1210-
};
1211-
IExecutor::TPtr CreateThreadPoolExecutorAdapter(
1212-
std::shared_ptr<IThreadPool> threadPool); // Thread pool is expected to have been started.
1213-
IExecutor::TPtr CreateThreadPoolExecutor(size_t threads);
1214-
1215-
IExecutor::TPtr CreateSyncExecutor();
1216-
12171080
//! Events for write session.
12181081
struct TWriteSessionEvent {
12191082

0 commit comments

Comments
 (0)