@@ -163,13 +163,58 @@ Y_UNIT_TEST_SUITE(WithSDK) {
163
163
}
164
164
}
165
165
166
- Y_UNIT_TEST (CommitWithWrongSessionId) {
167
- TTopicSdkTestSetup setup = CreateSetup ();
168
- setup.CreateTopic (std::string{TEST_TOPIC}, std::string{TEST_CONSUMER}, 1 );
166
+ void PrepareFlatTopic (TTopicSdkTestSetup& setup) {
167
+ setup.CreateTopic ();
169
168
170
169
setup.Write (" message-1" );
171
170
setup.Write (" message-2" );
172
171
setup.Write (" message-3" );
172
+ }
173
+
174
+ void PrepareAutopartitionedTopic (TTopicSdkTestSetup& setup) {
175
+ setup.CreateTopicWithAutoscale ();
176
+
177
+ // Creating partition hierarchy
178
+ // 0 ──┬──> 1 ──┬──> 3
179
+ // │ └──> 4
180
+ // └──> 2
181
+ //
182
+ // Each partition has 3 messages
183
+
184
+ setup.Write (" message-0-1" , 0 );
185
+ setup.Write (" message-0-2" , 0 );
186
+ setup.Write (" message-0-3" , 0 );
187
+
188
+ {
189
+ ui64 txId = 1006 ;
190
+ SplitPartition (setup, ++txId, 0 , " a" );
191
+ }
192
+
193
+ setup.Write (" message-1-1" , 1 );
194
+ setup.Write (" message-1-2" , 1 );
195
+ setup.Write (" message-1-3" , 1 );
196
+
197
+ setup.Write (" message-2-1" , 2 );
198
+ setup.Write (" message-2-2" , 2 );
199
+ setup.Write (" message-2-3" , 2 );
200
+
201
+ {
202
+ ui64 txId = 1007 ;
203
+ SplitPartition (setup, ++txId, 1 , " 0" );
204
+ }
205
+
206
+ setup.Write (" message-3-1" , 3 );
207
+ setup.Write (" message-3-2" , 3 );
208
+ setup.Write (" message-3-3" , 3 );
209
+
210
+ setup.Write (" message-4-1" , 4 );
211
+ setup.Write (" message-4-2" , 4 );
212
+ setup.Write (" message-4-3" , 4 );
213
+ }
214
+
215
+ Y_UNIT_TEST (CommitWithWrongSessionId) {
216
+ TTopicSdkTestSetup setup = CreateSetup ();
217
+ PrepareFlatTopic (setup);
173
218
174
219
{
175
220
auto result = setup.Commit (TString (TEST_TOPIC), TEST_CONSUMER, 0 , 1 , " wrong-read-session-id" );
@@ -182,11 +227,7 @@ Y_UNIT_TEST_SUITE(WithSDK) {
182
227
183
228
Y_UNIT_TEST (CommitToPastWithWrongSessionId) {
184
229
TTopicSdkTestSetup setup = CreateSetup ();
185
- setup.CreateTopic (std::string{TEST_TOPIC}, std::string{TEST_CONSUMER}, 1 );
186
-
187
- setup.Write (" message-1" );
188
- setup.Write (" message-2" );
189
- setup.Write (" message-3" );
230
+ PrepareFlatTopic (setup);
190
231
191
232
{
192
233
auto result = setup.Commit (TString (TEST_TOPIC), TEST_CONSUMER, 0 , 2 );
@@ -205,8 +246,7 @@ Y_UNIT_TEST_SUITE(WithSDK) {
205
246
}
206
247
}
207
248
208
- /* TODO Uncomment this test
209
- Y_UNIT_TEST(CommitToParentPartitionWithWrongSessionId) {
249
+ Y_UNIT_TEST (Commit_ToParentPartitionWithWrongSessionId) {
210
250
TTopicSdkTestSetup setup = CreateSetup ();
211
251
setup.CreateTopicWithAutoscale ();
212
252
@@ -248,7 +288,148 @@ Y_UNIT_TEST_SUITE(WithSDK) {
248
288
Cerr << " >>>>> END" << Endl << Flush;
249
289
250
290
}
251
- */
291
+
292
+ Y_UNIT_TEST (Commit_WithoutSession_ParentNotFinished) {
293
+ TTopicSdkTestSetup setup = CreateSetup ();
294
+ PrepareAutopartitionedTopic (setup);
295
+
296
+ auto getCommittedOffset = [&](size_t partition) {
297
+ auto desc = setup.DescribeConsumer ();
298
+ return desc.GetPartitions ().at (partition).GetPartitionConsumerStats ()->GetCommittedOffset ();
299
+ };
300
+
301
+ {
302
+ // Commit parent partition to non end
303
+ auto result = setup.Commit (TEST_TOPIC, TEST_CONSUMER, 0 , 1 );
304
+ UNIT_ASSERT (result.IsSuccess ());
305
+
306
+ UNIT_ASSERT_VALUES_EQUAL (1 , getCommittedOffset (0 ));
307
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (1 ));
308
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (2 ));
309
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (3 ));
310
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (4 ));
311
+ }
312
+
313
+ {
314
+ auto result = setup.Commit (TEST_TOPIC, TEST_CONSUMER, 1 , 1 );
315
+ UNIT_ASSERT (result.IsSuccess ());
316
+
317
+ UNIT_ASSERT_VALUES_EQUAL (3 , getCommittedOffset (0 ));
318
+ UNIT_ASSERT_VALUES_EQUAL (1 , getCommittedOffset (1 ));
319
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (2 ));
320
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (3 ));
321
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (4 ));
322
+ }
323
+ }
324
+
325
+ Y_UNIT_TEST (Commit_WithoutSession_ToPastParentPartition) {
326
+ TTopicSdkTestSetup setup = CreateSetup ();
327
+ PrepareAutopartitionedTopic (setup);
328
+
329
+ auto getCommittedOffset = [&](size_t partition) {
330
+ auto desc = setup.DescribeConsumer ();
331
+ return desc.GetPartitions ().at (partition).GetPartitionConsumerStats ()->GetCommittedOffset ();
332
+ };
333
+
334
+ {
335
+ // Commit child partition to non end
336
+ auto result = setup.Commit (TEST_TOPIC, TEST_CONSUMER, 3 , 1 );
337
+ UNIT_ASSERT (result.IsSuccess ());
338
+
339
+ UNIT_ASSERT_VALUES_EQUAL (3 , getCommittedOffset (0 ));
340
+ UNIT_ASSERT_VALUES_EQUAL (3 , getCommittedOffset (1 ));
341
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (2 ));
342
+ UNIT_ASSERT_VALUES_EQUAL (1 , getCommittedOffset (3 ));
343
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (4 ));
344
+ }
345
+
346
+ {
347
+ auto result = setup.Commit (TEST_TOPIC, TEST_CONSUMER, 1 , 1 );
348
+ UNIT_ASSERT (result.IsSuccess ());
349
+
350
+ UNIT_ASSERT_VALUES_EQUAL (3 , getCommittedOffset (0 ));
351
+ UNIT_ASSERT_VALUES_EQUAL (1 , getCommittedOffset (1 ));
352
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (2 ));
353
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (3 ));
354
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (4 ));
355
+ }
356
+ }
357
+
358
+ Y_UNIT_TEST (Commit_WithSession_ParentNotFinished) {
359
+ TTopicSdkTestSetup setup = CreateSetup ();
360
+ PrepareAutopartitionedTopic (setup);
361
+
362
+ auto getCommittedOffset = [&](size_t partition) {
363
+ auto desc = setup.DescribeConsumer ();
364
+ return desc.GetPartitions ().at (partition).GetPartitionConsumerStats ()->GetCommittedOffset ();
365
+ };
366
+
367
+ {
368
+ // Commit parent partition to non end
369
+ auto result = setup.Commit (TEST_TOPIC, TEST_CONSUMER, 0 , 1 );
370
+ UNIT_ASSERT (result.IsSuccess ());
371
+
372
+ UNIT_ASSERT_VALUES_EQUAL (1 , getCommittedOffset (0 ));
373
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (1 ));
374
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (2 ));
375
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (3 ));
376
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (4 ));
377
+ }
378
+
379
+ {
380
+ auto r = setup.Read (TEST_TOPIC, TEST_CONSUMER, [&](auto &) {
381
+ return false ;
382
+ });
383
+
384
+ // Commit parent to middle
385
+ auto result = setup.Commit (TEST_TOPIC, TEST_CONSUMER, 1 , 1 , r.StartPartitionSessionEvents .front ().GetPartitionSession ()->GetReadSessionId ());
386
+ UNIT_ASSERT (result.IsSuccess ());
387
+
388
+ UNIT_ASSERT_VALUES_EQUAL (3 , getCommittedOffset (0 ));
389
+ UNIT_ASSERT_VALUES_EQUAL (1 , getCommittedOffset (1 ));
390
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (2 ));
391
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (3 ));
392
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (4 ));
393
+ }
394
+ }
395
+
396
+ Y_UNIT_TEST (Commit_WithSession_ToPastParentPartition) {
397
+ TTopicSdkTestSetup setup = CreateSetup ();
398
+ PrepareAutopartitionedTopic (setup);
399
+
400
+ auto getCommittedOffset = [&](size_t partition) {
401
+ auto desc = setup.DescribeConsumer ();
402
+ return desc.GetPartitions ().at (partition).GetPartitionConsumerStats ()->GetCommittedOffset ();
403
+ };
404
+
405
+ {
406
+ // Commit parent partition to non end
407
+ auto result = setup.Commit (TEST_TOPIC, TEST_CONSUMER, 3 , 1 );
408
+ UNIT_ASSERT (result.IsSuccess ());
409
+
410
+ UNIT_ASSERT_VALUES_EQUAL (3 , getCommittedOffset (0 ));
411
+ UNIT_ASSERT_VALUES_EQUAL (3 , getCommittedOffset (1 ));
412
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (2 ));
413
+ UNIT_ASSERT_VALUES_EQUAL (1 , getCommittedOffset (3 ));
414
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (4 ));
415
+ }
416
+
417
+ {
418
+ auto r = setup.Read (TEST_TOPIC, TEST_CONSUMER, [&](auto &) {
419
+ return false ;
420
+ });
421
+
422
+ // Commit parent to middle
423
+ auto result = setup.Commit (TEST_TOPIC, TEST_CONSUMER, 1 , 1 , r.StartPartitionSessionEvents .front ().GetPartitionSession ()->GetReadSessionId ());
424
+ UNIT_ASSERT (result.IsSuccess ());
425
+
426
+ UNIT_ASSERT_VALUES_EQUAL (3 , getCommittedOffset (0 ));
427
+ UNIT_ASSERT_VALUES_EQUAL (3 , getCommittedOffset (1 ));
428
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (2 ));
429
+ UNIT_ASSERT_VALUES_EQUAL (1 , getCommittedOffset (3 ));
430
+ UNIT_ASSERT_VALUES_EQUAL (0 , getCommittedOffset (4 ));
431
+ }
432
+ }
252
433
}
253
434
254
435
} // namespace NKikimr
0 commit comments