@@ -80,6 +80,24 @@ bool IsOperationStarted(TStatus operationStatus) {
80
80
return operationStatus.IsSuccess () || operationStatus.GetStatus () == EStatus::STATUS_UNDEFINED;
81
81
}
82
82
83
+ ui32 CountDataFiles (const TFsPath& fsPath) {
84
+ ui32 dataFileId = 0 ;
85
+ TFsPath dataFile = fsPath.Child (DataFileName (dataFileId));
86
+ while (dataFile.Exists ()) {
87
+ dataFile = fsPath.Child (DataFileName (++dataFileId));
88
+ }
89
+ return dataFileId;
90
+ }
91
+
92
+ TRestoreResult CombineResults (const TVector<TRestoreResult>& results) {
93
+ for (auto result : results) {
94
+ if (!result.IsSuccess ()) {
95
+ return result;
96
+ }
97
+ }
98
+ return Result<TRestoreResult>();
99
+ }
100
+
83
101
} // anonymous
84
102
85
103
namespace NPrivate {
@@ -392,95 +410,164 @@ TRestoreResult TRestoreClient::CheckSchema(const TString& dbPath, const TTableDe
392
410
return Result<TRestoreResult>();
393
411
}
394
412
395
- struct TWriterWaiter {
396
- NPrivate::IDataWriter& Writer;
397
-
398
- TWriterWaiter (NPrivate::IDataWriter& writer)
399
- : Writer(writer)
400
- {
401
- }
402
-
403
- ~TWriterWaiter () {
404
- Writer.Wait ();
405
- }
406
- };
407
-
408
- TRestoreResult TRestoreClient::RestoreData (const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const TTableDescription& desc) {
409
- THolder<NPrivate::IDataAccumulator> accumulator;
413
+ THolder<NPrivate::IDataWriter> TRestoreClient::CreateDataWriter (const TString& dbPath, const TRestoreSettings& settings,
414
+ const TTableDescription& desc, const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators)
415
+ {
410
416
THolder<NPrivate::IDataWriter> writer;
411
-
412
417
switch (settings.Mode_ ) {
413
418
case TRestoreSettings::EMode::Yql:
414
419
case TRestoreSettings::EMode::BulkUpsert: {
415
- accumulator.Reset (CreateCompatAccumulator (dbPath, desc, settings));
416
- writer.Reset (CreateCompatWriter (dbPath, TableClient, accumulator.Get (), settings));
420
+ // Need only one accumulator to initialize query string
421
+ writer.Reset (CreateCompatWriter (dbPath, TableClient, accumulators[0 ].Get (), settings));
422
+ break ;
423
+ }
417
424
425
+ case TRestoreSettings::EMode::ImportData: {
426
+ writer.Reset (CreateImportDataWriter (dbPath, desc, ImportClient, TableClient, accumulators, settings, Log));
418
427
break ;
419
428
}
429
+ }
430
+ return writer;
431
+ }
432
+
433
+ TRestoreResult TRestoreClient::CreateDataAccumulators (TVector<THolder<NPrivate::IDataAccumulator>>& outAccumulators,
434
+ const TString& dbPath, const TRestoreSettings& settings, const NTable::TTableDescription& desc, ui32 dataFilesCount)
435
+ {
436
+ const ui32 accumulatorsCount = std::min (settings.InFly_ , dataFilesCount);
437
+ outAccumulators.resize (accumulatorsCount);
438
+
439
+ switch (settings.Mode_ ) {
440
+ case TRestoreSettings::EMode::Yql:
441
+ case TRestoreSettings::EMode::BulkUpsert:
442
+ for (size_t i = 0 ; i < accumulatorsCount; ++i) {
443
+ outAccumulators[i].Reset (CreateCompatAccumulator (dbPath, desc, settings));
444
+ }
445
+ break ;
420
446
421
447
case TRestoreSettings::EMode::ImportData: {
422
448
TMaybe<TTableDescription> actualDesc;
423
449
auto descResult = DescribeTable (TableClient, dbPath, actualDesc);
424
450
if (!descResult.IsSuccess ()) {
425
451
return Result<TRestoreResult>(dbPath, std::move (descResult));
426
452
}
427
-
428
- accumulator.Reset (CreateImportDataAccumulator (desc, *actualDesc, settings, Log));
429
- writer.Reset (CreateImportDataWriter (dbPath, desc, ImportClient, TableClient, accumulator.Get (), settings, Log));
430
-
453
+ for (size_t i = 0 ; i < accumulatorsCount; ++i) {
454
+ outAccumulators[i].Reset (CreateImportDataAccumulator (desc, *actualDesc, settings, Log));
455
+ }
431
456
break ;
432
457
}
433
458
}
459
+ return Result<TRestoreResult>();
460
+ }
434
461
435
- TWriterWaiter waiter (*writer);
436
-
437
- ui32 dataFileId = 0 ;
438
- TFsPath dataFile = fsPath.Child (DataFileName (dataFileId));
439
- TVector<TString> dataFileNames;
440
-
441
- while (dataFile.Exists ()) {
442
- LOG_D (" Read data from " << dataFile.GetPath ().Quote ());
443
-
444
- dataFileNames.push_back (dataFile);
445
- TFileInput input (dataFile, settings.FileBufferSize_ );
446
- TString line;
447
- ui64 lineNo = 0 ;
448
-
449
- while (input.ReadLine (line)) {
450
- auto l = NPrivate::TLine (std::move (line), dataFileNames.back (), ++lineNo);
451
- for (auto status = accumulator->Check (l); status != NPrivate::IDataAccumulator::OK; status = accumulator->Check (l)) {
452
- if (status == NPrivate::IDataAccumulator::ERROR) {
453
- return Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR,
454
- TStringBuilder () << " Invalid data: " << l.GetLocation ());
455
- }
456
-
457
- if (!accumulator->Ready (true )) {
458
- LOG_E (" Error reading data from " << dataFile.GetPath ().Quote ());
459
- return Result<TRestoreResult>(dbPath, EStatus::INTERNAL_ERROR, " Data is not ready" );
460
- }
462
+ TRestoreResult TRestoreClient::RestoreData (const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const TTableDescription& desc) {
463
+ const ui32 dataFilesCount = CountDataFiles (fsPath);
464
+ if (dataFilesCount == 0 ) {
465
+ return Result<TRestoreResult>();
466
+ }
461
467
462
- if (!writer->Push (accumulator->GetData (true ))) {
463
- LOG_E (" Error writing data to " << dbPath.Quote () << " , file: " << dataFile.GetPath ().Quote ());
464
- return Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, " Cannot write data #1" );
468
+ TVector<THolder<NPrivate::IDataAccumulator>> accumulators;
469
+ if (auto res = CreateDataAccumulators (accumulators, dbPath, settings, desc, dataFilesCount); !res.IsSuccess ()) {
470
+ return res;
471
+ }
472
+
473
+ THolder<NPrivate::IDataWriter> writer = CreateDataWriter (dbPath, settings, desc, accumulators);
474
+
475
+ TVector<TRestoreResult> accumulatorWorkersResults (accumulators.size (), Result<TRestoreResult>());
476
+ TThreadPool accumulatorWorkers (TThreadPool::TParams ().SetBlocking (true ));
477
+ accumulatorWorkers.Start (accumulators.size (), accumulators.size ());
478
+
479
+ const ui32 dataFilesPerAccumulator = dataFilesCount / accumulators.size ();
480
+ const ui32 dataFilesPerAccumulatorRemainder = dataFilesCount % accumulators.size ();
481
+ for (ui32 i = 0 ; i < accumulators.size (); ++i) {
482
+ auto * accumulator = accumulators[i].Get ();
483
+
484
+ ui32 dataFileIdStart = dataFilesPerAccumulator * i + std::min (i, dataFilesPerAccumulatorRemainder);
485
+ ui32 dataFileIdEnd = dataFilesPerAccumulator * (i + 1 ) + std::min (i + 1 , dataFilesPerAccumulatorRemainder);
486
+ auto func = [&, i, dataFileIdStart, dataFileIdEnd, accumulator]() {
487
+ for (size_t id = dataFileIdStart; id < dataFileIdEnd; ++id) {
488
+ TFsPath dataFile = fsPath.Child (DataFileName (id));
489
+
490
+ LOG_D (" Read data from " << dataFile.GetPath ().Quote ());
491
+
492
+ TFileInput input (dataFile, settings.FileBufferSize_ );
493
+ TString line;
494
+ ui64 lineNo = 0 ;
495
+
496
+ while (input.ReadLine (line)) {
497
+ auto l = NPrivate::TLine (std::move (line), dataFile.GetPath (), ++lineNo);
498
+
499
+ for (auto status = accumulator->Check (l); status != NPrivate::IDataAccumulator::OK; status = accumulator->Check (l)) {
500
+ if (status == NPrivate::IDataAccumulator::ERROR) {
501
+ accumulatorWorkersResults[i] = Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR,
502
+ TStringBuilder () << " Invalid data: " << l.GetLocation ());
503
+ return ;
504
+ }
505
+
506
+ if (!accumulator->Ready (true )) {
507
+ LOG_E (" Error reading data from " << dataFile.GetPath ().Quote ());
508
+ accumulatorWorkersResults[i] = Result<TRestoreResult>(dbPath, EStatus::INTERNAL_ERROR, " Data is not ready" );
509
+ return ;
510
+ }
511
+
512
+ if (!writer->Push (accumulator->GetData (true ))) {
513
+ LOG_E (" Error writing data to " << dbPath.Quote () << " , file: " << dataFile.GetPath ().Quote ());
514
+ accumulatorWorkersResults[i] = Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, " Cannot write data #1" );
515
+ return ;
516
+ }
517
+ }
518
+
519
+ accumulator->Feed (std::move (l));
520
+ if (accumulator->Ready ()) {
521
+ if (!writer->Push (accumulator->GetData ())) {
522
+ LOG_E (" Error writing data to " << dbPath.Quote () << " , file: " << dataFile.GetPath ().Quote ());
523
+ accumulatorWorkersResults[i] = Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, " Cannot write data #2" );
524
+ return ;
525
+ }
526
+ }
465
527
}
466
528
}
467
529
468
- accumulator->Feed (std::move (l));
469
- if (accumulator->Ready ()) {
470
- if (!writer->Push (accumulator->GetData ())) {
471
- LOG_E (" Error writing data to " << dbPath.Quote () << " , file: " << dataFile.GetPath ().Quote ());
472
- return Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, " Cannot write data #2" );
530
+ while (accumulator->Ready (true )) {
531
+ if (!writer->Push (accumulator->GetData (true ))) {
532
+ accumulatorWorkersResults[i] = Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, " Cannot write data #3" );
533
+ return ;
473
534
}
474
535
}
536
+ };
537
+
538
+ if (!accumulatorWorkers.AddFunc (std::move (func))) {
539
+ return Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, " Can't start restoring data: queue is full or shutting down" );
475
540
}
541
+ }
476
542
477
- dataFile = fsPath.Child (DataFileName (++dataFileId));
543
+ accumulatorWorkers.Stop ();
544
+ if (auto res = CombineResults (accumulatorWorkersResults); !res.IsSuccess ()) {
545
+ return res;
478
546
}
479
547
480
- while (accumulator->Ready (true )) {
481
- if (!writer->Push (accumulator->GetData (true ))) {
482
- LOG_E (" Error writing data to " << dbPath.Quote () << " , file: " << dataFile.GetPath ().Quote ());
483
- return Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, " Cannot write data #3" );
548
+ // ensure that all data is restored
549
+ while (true ) {
550
+ writer->Wait ();
551
+
552
+ bool dataFound = false ;
553
+ for (auto & acc : accumulators) {
554
+ if (acc->Ready (true )) {
555
+ dataFound = true ;
556
+ break ;
557
+ }
558
+ }
559
+
560
+ if (dataFound) {
561
+ writer = CreateDataWriter (dbPath, settings, desc, accumulators);
562
+ for (auto & acc : accumulators) {
563
+ while (acc->Ready (true )) {
564
+ if (!writer->Push (acc->GetData (true ))) {
565
+ return Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, " Cannot write data #4" );
566
+ }
567
+ }
568
+ }
569
+ } else {
570
+ break ;
484
571
}
485
572
}
486
573
0 commit comments