15
15
#include < util/generic/maybe.h>
16
16
#include < util/generic/vector.h>
17
17
#include < util/stream/file.h>
18
- #include < util/string/builder.h>
19
18
#include < util/string/join.h>
20
19
21
20
namespace NYdb {
@@ -83,6 +82,61 @@ bool IsOperationStarted(TStatus operationStatus) {
83
82
84
83
} // anonymous
85
84
85
+ namespace NPrivate {
86
+
87
+ TLocation::TLocation (TStringBuf file, ui64 lineNo)
88
+ : File(file)
89
+ , LineNo(lineNo)
90
+ {
91
+ }
92
+
93
+ void TLocation::Out (IOutputStream& out) const {
94
+ out << File << " :" << LineNo;
95
+ }
96
+
97
+ TLine::TLine (TString&& data, TStringBuf file, ui64 lineNo)
98
+ : Data(std::move(data))
99
+ , Location(file, lineNo)
100
+ {
101
+ }
102
+
103
+ TLine::TLine (TString&& data, const TLocation& location)
104
+ : Data(std::move(data))
105
+ , Location(location)
106
+ {
107
+ }
108
+
109
+ void TBatch::Add (const TLine& line) {
110
+ Data << line.GetData () << " \n " ;
111
+ Locations.push_back (line.GetLocation ());
112
+ }
113
+
114
+ TString TBatch::GetLocation () const {
115
+ THashMap<TStringBuf, std::pair<ui64, ui64>> locations;
116
+ for (const auto & location : Locations) {
117
+ auto it = locations.find (location.File );
118
+ if (it == locations.end ()) {
119
+ it = locations.emplace (location.File , std::make_pair (Max<ui64>(), Min<ui64>())).first ;
120
+ }
121
+ it->second .first = Min (location.LineNo , it->second .first );
122
+ it->second .second = Max (location.LineNo , it->second .second );
123
+ }
124
+
125
+ TStringBuilder result;
126
+ bool comma = false ;
127
+ for (const auto & [file, range] : locations) {
128
+ if (comma) {
129
+ result << " , " ;
130
+ }
131
+ result << file << " :" << range.first << " -" << range.second ;
132
+ comma = true ;
133
+ }
134
+
135
+ return result;
136
+ }
137
+
138
+ } // NPrivate
139
+
86
140
TRestoreClient::TRestoreClient (const TDriver& driver, const std::shared_ptr<TLog>& log)
87
141
: ImportClient(driver)
88
142
, OperationClient(driver)
@@ -371,25 +425,35 @@ TRestoreResult TRestoreClient::RestoreData(const TFsPath& fsPath, const TString&
371
425
return Result<TRestoreResult>(dbPath, std::move (descResult));
372
426
}
373
427
374
- accumulator.Reset (CreateImportDataAccumulator (desc, *actualDesc, settings));
428
+ accumulator.Reset (CreateImportDataAccumulator (desc, *actualDesc, settings, Log ));
375
429
writer.Reset (CreateImportDataWriter (dbPath, desc, ImportClient, TableClient, accumulator.Get (), settings, Log));
376
430
377
431
break ;
378
432
}
379
433
}
380
434
381
435
TWriterWaiter waiter (*writer);
436
+
382
437
ui32 dataFileId = 0 ;
383
438
TFsPath dataFile = fsPath.Child (DataFileName (dataFileId));
439
+ TVector<TString> dataFileNames;
384
440
385
441
while (dataFile.Exists ()) {
386
442
LOG_D (" Read data from " << dataFile.GetPath ().Quote ());
387
443
444
+ dataFileNames.push_back (dataFile);
388
445
TFileInput input (dataFile, settings.FileBufferSize_ );
389
446
TString line;
447
+ ui64 lineNo = 0 ;
390
448
391
449
while (input.ReadLine (line)) {
392
- while (!accumulator->Fits (line)) {
450
+ auto l = NPrivate::TLine (std::move (line), dataFileNames.back (), ++lineNo);
451
+ for (auto status = accumulator->Check (l); status != NPrivate::IDataAccumulator::OK; status = accumulator->Check (l)) {
452
+ if (status == NPrivate::IDataAccumulator::ERROR) {
453
+ return Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR,
454
+ TStringBuilder () << " Invalid data: " << l.GetLocation ());
455
+ }
456
+
393
457
if (!accumulator->Ready (true )) {
394
458
LOG_E (" Error reading data from " << dataFile.GetPath ().Quote ());
395
459
return Result<TRestoreResult>(dbPath, EStatus::INTERNAL_ERROR, " Data is not ready" );
@@ -401,7 +465,7 @@ TRestoreResult TRestoreClient::RestoreData(const TFsPath& fsPath, const TString&
401
465
}
402
466
}
403
467
404
- accumulator->Feed (std::move (line ));
468
+ accumulator->Feed (std::move (l ));
405
469
if (accumulator->Ready ()) {
406
470
if (!writer->Push (accumulator->GetData ())) {
407
471
LOG_E (" Error writing data to " << dbPath.Quote () << " , file: " << dataFile.GetPath ().Quote ());
@@ -518,3 +582,7 @@ TRestoreResult TRestoreClient::RestoreEmptyDir(const TFsPath& fsPath, const TStr
518
582
519
583
} // NDump
520
584
} // NYdb
585
+
586
+ Y_DECLARE_OUT_SPEC (, NYdb::NDump::NPrivate::TLocation, o, x) {
587
+ return x.Out (o);
588
+ }
0 commit comments