7
7
namespace NKikimr ::NColumnShard {
8
8
9
9
bool TWriteTask::Execute (TColumnShard* owner, const TActorContext& /* ctx */ ) {
10
- auto overloadStatus = owner->CheckOverloadedWait (PathId);
11
- if (overloadStatus != TColumnShard::EOverloadStatus::None) {
12
- AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD_WRITE)(" event" , " wait_overload" )(" status" , overloadStatus);
13
- return false ;
14
- }
15
-
16
10
owner->Counters .GetCSCounters ().WritingCounters ->OnWritingTaskDequeue (TMonotonic::Now () - Created);
17
11
owner->OperationsManager ->RegisterLock (LockId, owner->Generation ());
18
12
auto writeOperation = owner->OperationsManager ->RegisterOperation (
@@ -37,21 +31,39 @@ bool TWriteTasksQueue::Drain(const bool onWakeup, const TActorContext& ctx) {
37
31
if (onWakeup) {
38
32
WriteTasksOverloadCheckerScheduled = false ;
39
33
}
40
- while (WriteTasks.size () && WriteTasks.front ().Execute (Owner, ctx)) {
41
- WriteTasks.pop_front ();
34
+ std::vector<TInternalPathId> toRemove;
35
+ ui32 countTasks = 0 ;
36
+ for (auto && i : WriteTasks) {
37
+ auto overloadStatus = Owner->CheckOverloadedWait (i.first );
38
+ if (overloadStatus != TColumnShard::EOverloadStatus::None) {
39
+ countTasks += i.second .size ();
40
+ AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD_WRITE)(" event" , " wait_overload" )(" status" , overloadStatus)(" path_id" , i.first )(
41
+ " size" , i.second .size ());
42
+ continue ;
43
+ }
44
+ for (auto && t : i.second ) {
45
+ t.Execute (Owner, ctx);
46
+ }
47
+ toRemove.emplace_back (i.first );
42
48
}
43
- if (WriteTasks.size () && !WriteTasksOverloadCheckerScheduled) {
49
+
50
+ for (auto && i : toRemove) {
51
+ AFL_VERIFY (WriteTasks.erase (i));
52
+ }
53
+
54
+ if (countTasks && !WriteTasksOverloadCheckerScheduled) {
44
55
Owner->Schedule (TDuration::MilliSeconds (300 ), new NActors::TEvents::TEvWakeup (1 ));
45
56
WriteTasksOverloadCheckerScheduled = true ;
46
- AFL_WARN (NKikimrServices::TX_COLUMNSHARD)(" event" , " queue_on_write" )(" size" , WriteTasks. size () );
57
+ AFL_WARN (NKikimrServices::TX_COLUMNSHARD)(" event" , " queue_on_write" )(" size" , countTasks );
47
58
}
48
- Owner->Counters .GetCSCounters ().WritingCounters ->QueueWaitSize ->Add ((i64 )WriteTasks. size () - PredWriteTasksSize);
49
- PredWriteTasksSize = (i64 )WriteTasks. size () ;
50
- return !WriteTasks. size () ;
59
+ Owner->Counters .GetCSCounters ().WritingCounters ->QueueWaitSize ->Add ((i64 )countTasks - PredWriteTasksSize);
60
+ PredWriteTasksSize = (i64 )countTasks ;
61
+ return !countTasks ;
51
62
}
52
63
53
64
void TWriteTasksQueue::Enqueue (TWriteTask&& task) {
54
- WriteTasks.emplace_back (std::move (task));
65
+ const TInternalPathId pathId = task.GetPathId ();
66
+ WriteTasks[pathId].emplace_back (std::move (task));
55
67
}
56
68
57
69
TWriteTasksQueue::~TWriteTasksQueue () {
0 commit comments