@@ -112,23 +112,27 @@ class TRabbit : public NPDisk::TCompletionAction {
112
112
}
113
113
};
114
114
115
- class TCompletionCounter : public NPDisk ::TCompletionAction {
115
+ class TCompletionWorkerWithCounter : public NPDisk ::TCompletionAction {
116
116
public:
117
- TCompletionCounter (TAtomic& counter)
117
+ TCompletionWorkerWithCounter (TAtomic& counter, TDuration workTime = TDuration::Zero() )
118
118
: Counter(counter)
119
+ , WorkTime(workTime)
119
120
{}
120
121
121
122
void Exec (TActorSystem *) override {
123
+ Sleep (WorkTime);
122
124
AtomicIncrement (Counter);
123
125
delete this ;
124
126
}
125
127
126
128
void Release (TActorSystem *) override {
129
+ AtomicIncrement (Counter);
127
130
delete this ;
128
131
}
129
132
130
133
private:
131
134
TAtomic& Counter;
135
+ TDuration WorkTime;
132
136
};
133
137
134
138
static TString MakeDatabasePath (const char *dir) {
@@ -250,7 +254,7 @@ void RunWriteTestWithSectorMap(NPDisk::NSectorMap::EDiskMode diskMode, ui32 disk
250
254
TAtomic expectedWrites = diskSize / (offsetIncrement);
251
255
252
256
for (ui64 offset = 0 ; offset < diskSize; offset += offsetIncrement) {
253
- device->PwriteAsync (data.Get (), data.Size (), offset, new TCompletionCounter (completedWrites), NPDisk::TReqId (NPDisk::TReqId::Test1, 0 ), {});
257
+ device->PwriteAsync (data.Get (), data.Size (), offset, new TCompletionWorkerWithCounter (completedWrites), NPDisk::TReqId (NPDisk::TReqId::Test1, 0 ), {});
254
258
}
255
259
256
260
WaitForValue (&completedWrites, TIMEOUT, expectedWrites);
@@ -341,6 +345,56 @@ Y_UNIT_TEST_SUITE(TBlockDeviceTest) {
341
345
Ctest << " Done" << Endl;
342
346
}
343
347
348
+ Y_UNIT_TEST (WriteReadRestart) {
349
+ using namespace NPDisk ;
350
+
351
+ TActorSystemCreator creator;
352
+ auto start = TMonotonic::Now ();
353
+ while ((TMonotonic::Now () - start).Seconds () < 5 ) {
354
+ const TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = new ::NMonitoring::TDynamicCounters;
355
+ THolder<TPDiskMon> mon (new TPDiskMon (counters, 0 , nullptr ));
356
+
357
+ ui32 buffSize = 64_KB;
358
+ ui32 bufferPoolSize = 512 ;
359
+ THolder<NPDisk::TBufferPool> bufferPool (NPDisk::CreateBufferPool (buffSize, bufferPoolSize, false , {}));
360
+ ui64 inFlight = 128 ;
361
+ ui32 maxQueuedCompletionActions = bufferPoolSize / 2 ;
362
+ ui64 diskSize = 32_GB;
363
+
364
+ TIntrusivePtr<NPDisk::TSectorMap> sectorMap = new NPDisk::TSectorMap (diskSize, NSectorMap::DM_NONE);
365
+ THolder<NPDisk::IBlockDevice> device (CreateRealBlockDevice (" " , *mon, 0 , 0 , inFlight, TDeviceMode::None,
366
+ maxQueuedCompletionActions, sectorMap));
367
+ device->Initialize (std::make_shared<TPDiskCtx>(creator.GetActorSystem ()));
368
+
369
+ TAtomic counter = 0 ;
370
+ const i64 totalRequests = 500 ;
371
+ for (i64 i = 0 ; i < totalRequests; i++) {
372
+ auto *completion = new TCompletionWorkerWithCounter (counter, TDuration::MicroSeconds (100 ));
373
+ NPDisk::TBuffer::TPtr buffer (bufferPool->Pop ());
374
+ buffer->FlushAction = completion;
375
+ auto * data = buffer->Data ();
376
+ switch (RandomNumber<ui32>(3 )) {
377
+ case 0 :
378
+ device->PreadAsync (data, 32_KB, 0 , buffer.Release (), TReqId (), nullptr );
379
+ break ;
380
+ case 1 :
381
+ device->PwriteAsync (data, 32_KB, 0 , buffer.Release (), TReqId (), nullptr );
382
+ break ;
383
+ case 2 :
384
+ device->FlushAsync (completion, TReqId ());
385
+ buffer->FlushAction = nullptr ;
386
+ break ;
387
+ default :
388
+ break ;
389
+ }
390
+ }
391
+
392
+ Cerr << AtomicGet (counter) << Endl;
393
+ device.Destroy ();
394
+ UNIT_ASSERT (AtomicGet (counter) == totalRequests);
395
+ }
396
+ }
397
+
344
398
/*
345
399
Y_UNIT_TEST(TestRabbitCompletionAction) {
346
400
const TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = new ::NMonitoring::TDynamicCounters;
0 commit comments