1
1
#include " base_table_writer.h"
2
2
#include " logging.h"
3
+ #include " service.h"
3
4
#include " worker.h"
4
5
5
6
#include < ydb/core/base/tablet_pipecache.h>
16
17
17
18
#include < util/generic/map.h>
18
19
#include < util/generic/maybe.h>
20
+ #include < util/generic/set.h>
19
21
#include < util/string/builder.h>
20
22
21
23
namespace NKikimr ::NReplication::NService {
@@ -426,16 +428,71 @@ class TLocalTableWriter
426
428
427
429
Y_ABORT_UNLESS (PendingRecords.empty ());
428
430
TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> records (::Reserve (ev->Get ()->Records .size ()));
431
+ TSet<TRowVersion> versionsWithoutTxId;
429
432
430
- for (auto & record : ev->Get ()->Records ) {
431
- records.emplace_back (record.Offset , TablePathId, record.Data .size ());
432
- auto res = PendingRecords.emplace (
433
- record.Offset , Parser->Parse (ev->Get ()->Source , record.Offset , std::move (record.Data ))
434
- );
435
- Y_ABORT_UNLESS (res.second );
433
+ for (auto & [offset, data, _] : ev->Get ()->Records ) {
434
+ auto record = Parser->Parse (ev->Get ()->Source , offset, std::move (data));
435
+
436
+ if (Mode == EWriteMode::Consistent) {
437
+ const auto version = TRowVersion (record->GetStep (), record->GetTxId ());
438
+
439
+ if (record->GetKind () == NChangeExchange::IChangeRecord::EKind::CdcHeartbeat) {
440
+ TxIds.erase (TxIds.begin (), TxIds.upper_bound (version));
441
+ Send (Worker, new TEvService::TEvHeartbeat (version));
442
+ continue ;
443
+ } else if (record->GetKind () != NChangeExchange::IChangeRecord::EKind::CdcDataChange) {
444
+ Y_ABORT (" Unexpected record kind" );
445
+ }
446
+
447
+ if (auto it = TxIds.upper_bound (version); it != TxIds.end ()) {
448
+ record->RewriteTxId (it->second );
449
+ } else {
450
+ versionsWithoutTxId.insert (version);
451
+ PendingTxId[version].push_back (std::move (record));
452
+ continue ;
453
+ }
454
+ }
455
+
456
+ records.emplace_back (record->GetOrder (), TablePathId, record->GetBody ().size ());
457
+ Y_ABORT_UNLESS (PendingRecords.emplace (record->GetOrder (), std::move (record)).second );
458
+ }
459
+
460
+ if (versionsWithoutTxId) {
461
+ Send (Worker, new TEvService::TEvGetTxId (versionsWithoutTxId));
462
+ }
463
+
464
+ if (records) {
465
+ EnqueueRecords (std::move (records));
466
+ }
467
+ }
468
+
469
+ void Handle (TEvService::TEvTxIdResult::TPtr& ev) {
470
+ LOG_D (" Handle " << ev->Get ()->ToString ());
471
+
472
+ TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> records;
473
+
474
+ for (const auto & kv : ev->Get ()->Record .GetVersionTxIds ()) {
475
+ const auto version = TRowVersion::Parse (kv.GetVersion ());
476
+ TxIds.emplace (version, kv.GetTxId ());
477
+
478
+ for (auto it = PendingTxId.begin (); it != PendingTxId.end ();) {
479
+ if (it->first >= version) {
480
+ break ;
481
+ }
482
+
483
+ for (auto & record : it->second ) {
484
+ record->RewriteTxId (kv.GetTxId ());
485
+ records.emplace_back (record->GetOrder (), TablePathId, record->GetBody ().size ());
486
+ Y_ABORT_UNLESS (PendingRecords.emplace (record->GetOrder (), std::move (record)).second );
487
+ }
488
+
489
+ PendingTxId.erase (it++);
490
+ }
436
491
}
437
492
438
- EnqueueRecords (std::move (records));
493
+ if (records) {
494
+ EnqueueRecords (std::move (records));
495
+ }
439
496
}
440
497
441
498
void Handle (NChangeExchange::TEvChangeExchange::TEvRequestRecords::TPtr& ev) {
@@ -502,12 +559,14 @@ class TLocalTableWriter
502
559
}
503
560
504
561
explicit TLocalTableWriter (
562
+ EWriteMode mode,
505
563
const TPathId& tablePathId,
506
564
THolder<IChangeRecordParser>&& parser,
507
565
THolder<IChangeRecordSerializer>&& serializer,
508
566
std::function<NChangeExchange::IPartitionResolverVisitor*(const NKikimr::TKeyDesc&)>&& createResolverFn)
509
567
: TActor(&TThis::StateWork)
510
568
, TChangeSender(this , this , this , this , TActorId())
569
+ , Mode(mode)
511
570
, TablePathId(tablePathId)
512
571
, Parser(std::move(parser))
513
572
, Serializer(std::move(serializer))
@@ -523,6 +582,7 @@ class TLocalTableWriter
523
582
switch (ev->GetTypeRewrite ()) {
524
583
hFunc (TEvWorker::TEvHandshake, Handle);
525
584
hFunc (TEvWorker::TEvData, Handle);
585
+ hFunc (TEvService::TEvTxIdResult, Handle);
526
586
hFunc (NChangeExchange::TEvChangeExchange::TEvRequestRecords, Handle);
527
587
hFunc (NChangeExchange::TEvChangeExchange::TEvRemoveRecords, Handle);
528
588
hFunc (NChangeExchange::TEvChangeExchangePrivate::TEvReady, Handle);
@@ -536,6 +596,7 @@ class TLocalTableWriter
536
596
537
597
private:
538
598
mutable TMaybe<TString> LogPrefix;
599
+ const EWriteMode Mode;
539
600
const TPathId TablePathId;
540
601
THolder<IChangeRecordParser> Parser;
541
602
THolder<IChangeRecordSerializer> Serializer;
@@ -549,16 +610,19 @@ class TLocalTableWriter
549
610
bool Initialized = false ;
550
611
551
612
TMap<ui64, NChangeExchange::IChangeRecord::TPtr> PendingRecords;
613
+ TMap<TRowVersion, ui64> TxIds; // key is non-inclusive right hand edge
614
+ TMap<TRowVersion, TVector<NChangeExchange::IChangeRecord::TPtr>> PendingTxId;
552
615
553
616
}; // TLocalTableWriter
554
617
555
618
IActor* CreateLocalTableWriter (
556
619
const TPathId& tablePathId,
557
620
THolder<IChangeRecordParser>&& parser,
558
621
THolder<IChangeRecordSerializer>&& serializer,
559
- std::function<NChangeExchange::IPartitionResolverVisitor*(const NKikimr::TKeyDesc&)>&& createResolverFn)
622
+ std::function<NChangeExchange::IPartitionResolverVisitor*(const NKikimr::TKeyDesc&)>&& createResolverFn,
623
+ EWriteMode mode)
560
624
{
561
- return new TLocalTableWriter (tablePathId, std::move (parser), std::move (serializer), std::move (createResolverFn));
625
+ return new TLocalTableWriter (mode, tablePathId, std::move (parser), std::move (serializer), std::move (createResolverFn));
562
626
}
563
627
564
628
} // namespace NKikimr::NReplication::NService
0 commit comments