@@ -93,12 +93,15 @@ void TColumnShard::Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& e
93
93
NActors::TLogContextBuilder::Build (NKikimrServices::TX_COLUMNSHARD_WRITE)(" tablet_id" , TabletID ())(" event" , " TEvWritePortionResult" );
94
94
std::vector<TNoDataWrite> noDataWrites = ev->Get ()->DetachNoDataWrites ();
95
95
for (auto && i : noDataWrites) {
96
+ AFL_WARN (NKikimrServices::TX_COLUMNSHARD_WRITE)(" event" , " no_data_write_finished" )(" writing_size" , i.GetDataSize ())(" writing_id" , i.GetWriteMeta ().GetId ());
96
97
Counters.GetWritesMonitor ()->OnFinishWrite (i.GetDataSize (), 1 );
97
98
}
98
99
if (ev->Get ()->GetWriteStatus () == NKikimrProto::OK) {
99
100
std::vector<TInsertedPortions> writtenPacks = ev->Get ()->DetachInsertedPacks ();
100
101
const TMonotonic now = TMonotonic::Now ();
101
102
for (auto && i : writtenPacks) {
103
+ AFL_WARN (NKikimrServices::TX_COLUMNSHARD_WRITE)(" writing_size" , i.GetDataSize ())(" event" , " data_write_finished" )(
104
+ " writing_id" , i.GetWriteMeta ().GetId ());
102
105
Counters.OnWritePutBlobsSuccess (now - i.GetWriteMeta ().GetWriteStartInstant (), i.GetRecordsCount ());
103
106
Counters.GetWritesMonitor ()->OnFinishWrite (i.GetDataSize (), 1 );
104
107
}
@@ -115,6 +118,8 @@ void TColumnShard::Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& e
115
118
for (auto && i : writtenPacks) {
116
119
Counters.OnWritePutBlobsFailed (now - i.GetWriteMeta ().GetWriteStartInstant (), i.GetRecordsCount ());
117
120
Counters.GetCSCounters ().OnWritePutBlobsFail (now - i.GetWriteMeta ().GetWriteStartInstant ());
121
+ AFL_WARN (NKikimrServices::TX_COLUMNSHARD_WRITE)(" writing_size" , i.GetDataSize ())(" event" , " data_write_error" )(
122
+ " writing_id" , i.GetWriteMeta ().GetId ());
118
123
Counters.GetWritesMonitor ()->OnFinishWrite (i.GetDataSize (), 1 );
119
124
}
120
125
Execute (new TTxBlobsWritingFailed (this , ev->Get ()->GetWriteStatus (), std::move (writtenPacks)), ctx);
@@ -131,10 +136,11 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
131
136
auto baseAggregations = wBuffer.GetAggregations ();
132
137
wBuffer.InitReplyReceived (TMonotonic::Now ());
133
138
134
- Counters.GetWritesMonitor ()->OnFinishWrite (wBuffer.GetSumSize (), wBuffer.GetAggregations ().size ());
135
-
136
139
for (auto && aggr : baseAggregations) {
137
140
const auto & writeMeta = aggr->GetWriteMeta ();
141
+ AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD_WRITE)(" event" , " blobs_write_finished" )(" writing_size" , aggr->GetSize ())(
142
+ " writing_id" , writeMeta.GetId ())(" status" , putResult.GetPutStatus ());
143
+ Counters.GetWritesMonitor ()->OnFinishWrite (aggr->GetSize (), 1 );
138
144
139
145
if (!TablesManager.IsReadyForWrite (writeMeta.GetTableId ())) {
140
146
ACFL_ERROR (" event" , " absent_pathId" )(" path_id" , writeMeta.GetTableId ())(" has_index" , TablesManager.HasPrimaryIndex ());
@@ -210,7 +216,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
210
216
granuleShardingVersion = record.GetGranuleShardingVersion ();
211
217
}
212
218
213
- NEvWrite::TWriteMeta writeMeta (writeId, pathId, source, granuleShardingVersion);
219
+ NEvWrite::TWriteMeta writeMeta (writeId, pathId, source, granuleShardingVersion, TGUID::CreateTimebased (). AsGuidString () );
214
220
if (record.HasModificationType ()) {
215
221
writeMeta.SetModificationType (TEnumOperator<NEvWrite::EModificationType>::DeserializeFromProto (record.GetModificationType ()));
216
222
}
@@ -288,7 +294,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
288
294
writeData.MutableWriteMeta ().SetWriteMiddle1StartInstant (TMonotonic::Now ());
289
295
290
296
NOlap::TWritingContext context (TabletID (), SelfId (), snapshotSchema, StoragesManager, Counters.GetIndexationCounters ().SplitterCounters ,
291
- Counters.GetCSCounters ().WritingCounters , GetLastTxSnapshot ());
297
+ Counters.GetCSCounters ().WritingCounters , GetLastTxSnapshot (), std::make_shared<TAtomicCounter>( 1 ) );
292
298
std::shared_ptr<NConveyor::ITask> task =
293
299
std::make_shared<NOlap::TBuildBatchesTask>(BufferizationWriteActorId, std::move (writeData), context);
294
300
NConveyor::TInsertServiceOperator::AsyncTaskToExecute (task);
@@ -460,6 +466,15 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
460
466
const auto & record = ev->Get ()->Record ;
461
467
const auto source = ev->Sender ;
462
468
const auto cookie = ev->Cookie ;
469
+
470
+ if (!TablesManager.GetPrimaryIndex ()) {
471
+ Counters.GetTabletCounters ()->IncCounter (COUNTER_WRITE_FAIL);
472
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildError (
473
+ TabletID (), 0 , NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, " schema not ready for writing" );
474
+ ctx.Send (source, result.release (), 0 , cookie);
475
+ return ;
476
+ }
477
+
463
478
const auto behaviourConclusion = TOperationsManager::GetBehaviour (*ev->Get ());
464
479
AFL_TRACE (NKikimrServices::TX_COLUMNSHARD_WRITE)(" ev_write" , record.DebugString ());
465
480
if (behaviourConclusion.IsFail ()) {
@@ -560,12 +575,10 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
560
575
if (overloadStatus != EOverloadStatus::None) {
561
576
std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError (
562
577
TabletID (), 0 , NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, " overload data error" );
563
- OverloadWriteFail (overloadStatus, NEvWrite::TWriteMeta (0 , pathId, source, {}), arrowData->GetSize (), cookie, std::move (result), ctx);
578
+ OverloadWriteFail (overloadStatus, NEvWrite::TWriteMeta (0 , pathId, source, {}, TGUID::CreateTimebased (). AsGuidString () ), arrowData->GetSize (), cookie, std::move (result), ctx);
564
579
return ;
565
580
}
566
581
567
- Counters.GetWritesMonitor ()->OnStartWrite (arrowData->GetSize ());
568
-
569
582
std::optional<ui32> granuleShardingVersionId;
570
583
if (record.HasGranuleShardingVersionId ()) {
571
584
granuleShardingVersionId = record.GetGranuleShardingVersionId ();
@@ -586,10 +599,15 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
586
599
OperationsManager->RegisterLock (lockId, Generation ());
587
600
auto writeOperation = OperationsManager->RegisterOperation (
588
601
pathId, lockId, cookie, granuleShardingVersionId, *mType , AppDataVerified ().FeatureFlags .GetEnableWritePortionsOnInsert ());
602
+
603
+ AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD_WRITE)(" writing_size" , arrowData->GetSize ())(" operation_id" , writeOperation->GetIdentifier ())(
604
+ " in_flight" , Counters.GetWritesMonitor ()->GetWritesInFlight ())(" size_in_flight" , Counters.GetWritesMonitor ()->GetWritesSizeInFlight ());
605
+ Counters.GetWritesMonitor ()->OnStartWrite (arrowData->GetSize ());
606
+
589
607
Y_ABORT_UNLESS (writeOperation);
590
608
writeOperation->SetBehaviour (behaviour);
591
- NOlap::TWritingContext wContext (pathId , SelfId (), schema, StoragesManager, Counters.GetIndexationCounters ().SplitterCounters ,
592
- Counters.GetCSCounters ().WritingCounters , NOlap::TSnapshot::Max ());
609
+ NOlap::TWritingContext wContext (TabletID () , SelfId (), schema, StoragesManager, Counters.GetIndexationCounters ().SplitterCounters ,
610
+ Counters.GetCSCounters ().WritingCounters , NOlap::TSnapshot::Max (), writeOperation-> GetActivityChecker () );
593
611
arrowData->SetSeparationPoints (GetIndexAs<NOlap::TColumnEngineForLogs>().GetGranulePtrVerified (pathId)->GetBucketPositions ());
594
612
writeOperation->Start (*this , arrowData, source, wContext);
595
613
}
0 commit comments