@@ -343,6 +343,10 @@ class TLogWrite : public TRequestBase {
343
343
OnDestroy = std::move (onDestroy);
344
344
}
345
345
346
+ void Abort (TActorSystem* actorSystem) override {
347
+ actorSystem->Send (Sender, new NPDisk::TEvLogResult (NKikimrProto::CORRUPTED, 0 , " TLogWrite is being aborted" ));
348
+ }
349
+
346
350
TString ToString () const {
347
351
TStringStream str;
348
352
str << " TLogWrite {" ;
@@ -362,7 +366,6 @@ class TCompletionChunkRead;
362
366
//
363
367
class TChunkRead : public TRequestBase {
364
368
protected:
365
- static TAtomic LastIndex;
366
369
static constexpr ui64 ReferenceCanary = 890461871990457885ull ;
367
370
public:
368
371
ui32 ChunkIdx;
@@ -373,7 +376,6 @@ class TChunkRead : public TRequestBase {
373
376
ui64 CurrentSector = 0 ;
374
377
ui64 RemainingSize;
375
378
TCompletionChunkRead *FinalCompletion = nullptr ;
376
- TAtomicBase Index;
377
379
bool IsReplied = false ;
378
380
379
381
ui64 SlackSize;
@@ -399,7 +401,6 @@ class TChunkRead : public TRequestBase {
399
401
, SlackSize(Max<ui32>())
400
402
, DoubleFreeCanary(ReferenceCanary)
401
403
{
402
- Index = AtomicIncrement (LastIndex);
403
404
}
404
405
405
406
virtual ~TChunkRead () {
@@ -479,8 +480,6 @@ class TChunkReadPiece : public TRequestBase {
479
480
// TChunkWrite
480
481
//
481
482
class TChunkWrite : public TRequestBase {
482
- protected:
483
- static TAtomic LastIndex;
484
483
public:
485
484
ui32 ChunkIdx;
486
485
ui32 Offset;
@@ -494,12 +493,13 @@ class TChunkWrite : public TRequestBase {
494
493
ui32 CurrentPart = 0 ;
495
494
ui32 CurrentPartOffset = 0 ;
496
495
ui32 RemainingSize = 0 ;
497
- ui32 UnenqueuedSize;
498
- TAtomicBase Index;
499
496
500
497
ui32 SlackSize;
501
498
ui32 BytesWritten = 0 ;
502
499
500
+ TAtomic Pieces = 0 ;
501
+ TAtomic Aborted = 0 ;
502
+
503
503
THolder<NPDisk::TCompletionAction> Completion;
504
504
505
505
TChunkWrite (const NPDisk::TEvChunkWrite &ev, const TActorId &sender, TReqId reqId, NWilson::TSpan span)
@@ -511,31 +511,31 @@ class TChunkWrite : public TRequestBase {
511
511
, DoFlush(ev.DoFlush)
512
512
, IsSeqWrite(ev.IsSeqWrite)
513
513
{
514
- Index = AtomicIncrement (LastIndex);
515
514
if (PartsPtr) {
516
515
for (size_t i = 0 ; i < PartsPtr->Size (); ++i) {
517
516
RemainingSize += (*PartsPtr)[i].second ;
518
517
}
519
518
}
520
519
TotalSize = RemainingSize;
521
- UnenqueuedSize = RemainingSize;
522
520
SlackSize = Max<ui32>();
523
521
}
524
522
525
- ERequestType GetType () const override {
526
- return ERequestType::RequestChunkWrite ;
523
+ void RegisterPiece () {
524
+ AtomicIncrement (Pieces) ;
527
525
}
528
526
529
- void EstimateCost (const TDriveModel &drive) override {
530
- Cost = drive.SeekTimeNs () + drive.TimeForSizeNs ((ui64)UnenqueuedSize, ChunkIdx, TDriveModel::OP_TYPE_WRITE);
527
+ void AbortPiece (TActorSystem *actorSystem) {
528
+ if (AtomicDecrement (Pieces) == 0 ) {
529
+ this ->Abort (actorSystem);
530
+ }
531
531
}
532
532
533
- bool IsFinalIteration () {
534
- return UnenqueuedSize <= SlackSize ;
533
+ ERequestType GetType () const override {
534
+ return ERequestType::RequestChunkWrite ;
535
535
}
536
536
537
- bool IsTotallyEnqueued () {
538
- return UnenqueuedSize == 0 ;
537
+ void EstimateCost ( const TDriveModel &drive) override {
538
+ Cost = drive. SeekTimeNs () + drive. TimeForSizeNs ((ui64)TotalSize, ChunkIdx, TDriveModel::OP_TYPE_WRITE) ;
539
539
}
540
540
541
541
bool TryStealSlack (ui64& slackNs, const TDriveModel &drive, ui64 appendBlockSize, bool adhesion) override {
@@ -547,14 +547,20 @@ class TChunkWrite : public TRequestBase {
547
547
if (SlackSize >= appendBlockSize) {
548
548
SlackSize = Min (
549
549
SlackSize / appendBlockSize * appendBlockSize,
550
- (UnenqueuedSize + appendBlockSize - 1 ) / appendBlockSize * appendBlockSize);
550
+ (TotalSize + appendBlockSize - 1 ) / appendBlockSize * appendBlockSize);
551
551
ui64 costNs = (adhesion? 0 : drive.SeekTimeNs ()) + drive.TimeForSizeNs ((ui64)SlackSize, ChunkIdx, TDriveModel::OP_TYPE_WRITE);
552
552
slackNs -= costNs;
553
553
return true ;
554
554
} else {
555
555
return false ;
556
556
}
557
557
}
558
+
559
+ void Abort (TActorSystem* actorSystem) override {
560
+ if (!AtomicSwap (&Aborted, true )) {
561
+ actorSystem->Send (Sender, new NPDisk::TEvChunkWriteResult (NKikimrProto::CORRUPTED, ChunkIdx, Cookie, 0 , " TChunkWrite is being aborted" ));
562
+ }
563
+ }
558
564
};
559
565
560
566
//
@@ -571,7 +577,9 @@ class TChunkWritePiece : public TRequestBase {
571
577
, ChunkWrite(write)
572
578
, PieceShift(pieceShift)
573
579
, PieceSize(pieceSize)
574
- {}
580
+ {
581
+ ChunkWrite->RegisterPiece ();
582
+ }
575
583
576
584
ERequestType GetType () const override {
577
585
return ERequestType::RequestChunkWritePiece;
@@ -581,6 +589,12 @@ class TChunkWritePiece : public TRequestBase {
581
589
Cost = drive.SeekTimeNs () +
582
590
drive.TimeForSizeNs ((ui64)PieceSize, ChunkWrite->ChunkIdx , TDriveModel::OP_TYPE_WRITE);
583
591
}
592
+
593
+ void Abort (TActorSystem* actorSystem) override {
594
+ if (ChunkWrite) {
595
+ ChunkWrite->AbortPiece (actorSystem);
596
+ }
597
+ }
584
598
};
585
599
586
600
//
0 commit comments