@@ -47,11 +47,21 @@ struct Checker : public IChecker {
47
47
UNIT_ASSERT_VALUES_EQUAL_C (Get (value), Expected, msg);
48
48
}
49
49
50
- T Get (const ::Ydb::Value& value);
50
+ virtual T Get (const ::Ydb::Value& value);
51
51
52
52
T Expected;
53
53
};
54
54
55
+ struct DateTimeChecker : public Checker <TInstant> {
56
+ DateTimeChecker (TInstant&& expected)
57
+ : Checker<TInstant>(std::move(expected)) {
58
+ }
59
+
60
+ TInstant Get (const ::Ydb::Value& value) override {
61
+ return TInstant::Seconds (value.uint32_value ());
62
+ }
63
+ };
64
+
55
65
template <>
56
66
inline bool Checker<bool >::Get(const ::Ydb::Value& value) {
57
67
return value.bool_value ();
@@ -95,6 +105,14 @@ std::pair<TString, std::shared_ptr<IChecker>> _C(TString&& name, T&& expected) {
95
105
};
96
106
}
97
107
108
+ template <typename C, typename T>
109
+ std::pair<TString, std::shared_ptr<IChecker>> _T (TString&& name, T&& expected) {
110
+ return {
111
+ std::move (name),
112
+ std::make_shared<C>(std::move (expected))
113
+ };
114
+ }
115
+
98
116
struct TMessage {
99
117
TString Message;
100
118
std::optional<ui32> Partition = std::nullopt;
@@ -150,6 +168,7 @@ struct MainTestCase {
150
168
, ConnectionString(GetEnv(" YDB_ENDPOINT" ) + " /?database=" + GetEnv(" YDB_DATABASE" ))
151
169
, TopicName(TStringBuilder() << " Topic_" << Id)
152
170
, SourceTableName(TStringBuilder() << " SourceTable_" << Id)
171
+ , ChangefeedName(TStringBuilder() << " cdc_" << Id)
153
172
, TableName(TStringBuilder() << " Table_" << Id)
154
173
, ReplicationName(TStringBuilder() << " Replication_" << Id)
155
174
, TransferName(TStringBuilder() << " Transfer_" << Id)
@@ -209,6 +228,16 @@ struct MainTestCase {
209
228
ExecuteDDL (Sprintf (" DROP TABLE `%s`" , SourceTableName.data ()));
210
229
}
211
230
231
+ void AddChangefeed () {
232
+ ExecuteDDL (Sprintf (R"(
233
+ ALTER TABLE `%s`
234
+ ADD CHANGEFEED `%s` WITH (
235
+ MODE = 'UPDATES',
236
+ FORMAT = 'JSON'
237
+ )
238
+ )" , SourceTableName.data (), ChangefeedName.data ()));
239
+ }
240
+
212
241
void CreateTopic (size_t partitionCount = 10 ) {
213
242
ExecuteDDL (Sprintf (R"(
214
243
CREATE TOPIC `%s`
@@ -230,14 +259,19 @@ struct MainTestCase {
230
259
}
231
260
232
261
struct CreateTransferSettings {
262
+ std::optional<TString> TopicName = std::nullopt;
233
263
std::optional<TString> ConsumerName = std::nullopt;
234
- std::optional<TDuration> FlushInterval;
235
- std::optional<ui64> BatchSizeBytes;
264
+ std::optional<TDuration> FlushInterval = TDuration::Seconds(1 );
265
+ std::optional<ui64> BatchSizeBytes = 8_MB;
266
+
267
+ CreateTransferSettings () {};
236
268
237
- CreateTransferSettings ()
238
- : ConsumerName(std::nullopt)
239
- , FlushInterval(TDuration::Seconds(1 ))
240
- , BatchSizeBytes(8_MB) {}
269
+ static CreateTransferSettings WithTopic (const TString& topicName, std::optional<TString> consumerName = std::nullopt) {
270
+ CreateTransferSettings result;
271
+ result.TopicName = topicName;
272
+ result.ConsumerName = consumerName;
273
+ return result;
274
+ }
241
275
242
276
static CreateTransferSettings WithConsumerName (const TString& consumerName) {
243
277
CreateTransferSettings result;
@@ -265,6 +299,8 @@ struct MainTestCase {
265
299
sb << " , BATCH_SIZE_BYTES = " << *settings.BatchSizeBytes << Endl;
266
300
}
267
301
302
+ TString topicName = settings.TopicName .value_or (TopicName);
303
+
268
304
auto ddl = Sprintf (R"(
269
305
%s;
270
306
@@ -274,7 +310,7 @@ struct MainTestCase {
274
310
CONNECTION_STRING = 'grpc://%s'
275
311
%s
276
312
);
277
- )" , lambda.data (), TransferName.data (), TopicName .data (), TableName.data (), ConnectionString.data (), sb.data ());
313
+ )" , lambda.data (), TransferName.data (), topicName .data (), TableName.data (), ConnectionString.data (), sb.data ());
278
314
279
315
ExecuteDDL (ddl);
280
316
}
@@ -558,6 +594,7 @@ struct MainTestCase {
558
594
559
595
const TString TopicName;
560
596
const TString SourceTableName;
597
+ const TString ChangefeedName;
561
598
const TString TableName;
562
599
const TString ReplicationName;
563
600
const TString TransferName;
0 commit comments