Skip to content

Commit b8c0fee

Browse files
authored
Fix bulk CS stats (#20253)
1 parent 5314402 commit b8c0fee

File tree

17 files changed

+74
-18
lines changed

17 files changed

+74
-18
lines changed

ydb/core/protos/counters_columnshard.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ enum ECumulativeCounters {
139139
COUNTER_IMMEDIATE_TX_COMPLETED = 83 [(CounterOpts) = {Name: "ImmediateTxCompleted"}];
140140
COUNTER_ROWS_ERASED = 84 [(CounterOpts) = {Name: "RowsErased"}];
141141
COUNTER_OPERATIONS_ROWS_WRITTEN = 85 [(CounterOpts) = {Name: "OperationsRowsWritten"}];
142+
COUNTER_OPERATIONS_BULK_ROWS_WRITTEN = 86 [(CounterOpts) = {Name: "OperationsBulkRowsWritten"}];
143+
COUNTER_OPERATIONS_BULK_BYTES_WRITTEN = 87 [(CounterOpts) = {Name: "OperationsBulkBytesWritten"}];
142144
}
143145

144146
enum EPercentileCounters {

ydb/core/protos/data_events.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ message TEvWrite {
104104
optional EDataFormat PayloadFormat = 5;
105105
optional string PayloadSchema = 6;
106106
optional uint32 DefaultFilledColumnCount = 7;
107+
optional bool IsBulk = 8;
107108
}
108109

109110
// Transaction operations

ydb/core/protos/tx_columnshard.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,4 +354,5 @@ message TInternalOperationData {
354354
optional uint32 ModificationType = 2;
355355
optional uint64 PathId = 3;
356356
optional bool WritePortions = 4;
357+
optional bool IsBulk = 5;
357358
}

ydb/core/tablet/tablet_counters_aggregator.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -817,6 +817,8 @@ class TTabletMon {
817817
TCounterPtr ColumnShardScannedRows_;
818818
TCounterPtr ColumnShardOperationsRowsWritten_;
819819
TCounterPtr ColumnShardOperationsBytesWritten_;
820+
TCounterPtr ColumnShardOperationsBulkRowsWritten_;
821+
TCounterPtr ColumnShardOperationsBulkBytesWritten_;
820822
TCounterPtr ColumnShardErasedBytes_;
821823
TCounterPtr ColumnShardErasedRows_;
822824
THistogramPtr ColumnShardConsumedCpuHistogram;
@@ -969,6 +971,8 @@ class TTabletMon {
969971
ColumnShardScannedRows_ = appGroup->GetCounter("ColumnShard/ScannedRows");
970972
ColumnShardOperationsRowsWritten_ = appGroup->GetCounter("ColumnShard/OperationsRowsWritten");
971973
ColumnShardOperationsBytesWritten_ = appGroup->GetCounter("ColumnShard/OperationsBytesWritten");
974+
ColumnShardOperationsBulkRowsWritten_ = appGroup->GetCounter("ColumnShard/OperationsBulkRowsWritten");
975+
ColumnShardOperationsBulkBytesWritten_ = appGroup->GetCounter("ColumnShard/OperationsBulkBytesWritten");
972976
ColumnShardErasedBytes_ = appGroup->GetCounter("ColumnShard/BytesErased");
973977
ColumnShardErasedRows_ = appGroup->GetCounter("ColumnShard/RowsErased");
974978
ColumnShardConsumedCpuHistogram = appGroup->FindHistogram("HIST(ConsumedCPU)");
@@ -1023,8 +1027,8 @@ class TTabletMon {
10231027
ColumnShardScanBytes_->Set(ColumnShardScannedBytes_->Val());
10241028
ColumnShardWriteRows_->Set(ColumnShardOperationsRowsWritten_->Val());
10251029
ColumnShardWriteBytes_->Set(ColumnShardOperationsBytesWritten_->Val());
1026-
ColumnShardBulkUpsertRows_->Set(ColumnShardOperationsRowsWritten_->Val());
1027-
ColumnShardBulkUpsertBytes_->Set(ColumnShardOperationsBytesWritten_->Val());
1030+
ColumnShardBulkUpsertRows_->Set(ColumnShardOperationsBulkRowsWritten_->Val());
1031+
ColumnShardBulkUpsertBytes_->Set(ColumnShardOperationsBulkBytesWritten_->Val());
10281032
ColumnShardEraseRows_->Set(ColumnShardErasedRows_->Val());
10291033
ColumnShardEraseBytes_->Set(ColumnShardErasedBytes_->Val());
10301034

ydb/core/tx/columnshard/blobs_action/abstract/write.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ class IBlobsWritingAction: public ICommonBlobsAction {
2323
THashSet<TUnifiedBlobId> BlobsWaiting;
2424
bool Aborted = false;
2525
std::shared_ptr<NBlobOperations::TWriteCounters> Counters;
26+
YDB_FLAG_ACCESSOR(Bulk, false);
2627
void AddDataForWrite(const TUnifiedBlobId& blobId, const TString& data);
28+
2729
protected:
2830
virtual void DoOnExecuteTxBeforeWrite(NColumnShard::TColumnShard& self, TBlobManagerDb& dbBlobs) = 0;
2931
virtual void DoOnCompleteTxBeforeWrite(NColumnShard::TColumnShard& self) = 0;

ydb/core/tx/columnshard/blobs_action/bs/write.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@ void TWriteAction::DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& self, co
1515
ui64 blobsWritten = BlobBatch.GetBlobCount();
1616
ui64 bytesWritten = BlobBatch.GetTotalSize();
1717
if (blobsWroteSuccessfully) {
18-
self.Counters.GetTabletCounters()->OnWriteSuccess(blobsWritten, bytesWritten);
18+
if (IsBulk()) {
19+
self.Counters.GetTabletCounters()->OnBulkWriteSuccess(blobsWritten, bytesWritten);
20+
} else {
21+
self.Counters.GetTabletCounters()->OnWriteSuccess(blobsWritten, bytesWritten);
22+
}
1923
Manager->SaveBlobBatchOnComplete(std::move(BlobBatch));
2024
} else {
2125
self.Counters.GetTabletCounters()->OnWriteFailure();

ydb/core/tx/columnshard/columnshard__write.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,15 +105,23 @@ void TColumnShard::Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& e
105105
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", i.GetDataSize())("event", "data_write_finished")(
106106
"writing_id", i.GetWriteMeta().GetId());
107107
i.MutableWriteMeta().OnStage(NEvWrite::EWriteStage::SuccessWritingToLocalDB);
108-
Counters.OnWritePutBlobsSuccess(now - i.GetWriteMeta().GetWriteStartInstant(), i.GetRecordsCount());
108+
if (i.GetWriteMeta().IsBulk()) {
109+
Counters.OnWritePutBulkBlobsSuccess(now - i.GetWriteMeta().GetWriteStartInstant(), i.GetRecordsCount());
110+
} else {
111+
Counters.OnWritePutBlobsSuccess(now - i.GetWriteMeta().GetWriteStartInstant(), i.GetRecordsCount());
112+
}
109113
Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1);
110114
}
111115
Execute(new TTxBlobsWritingFinished(this, ev->Get()->GetWriteStatus(), ev->Get()->GetWriteAction(), std::move(writtenData)), ctx);
112116
} else {
113117
const TMonotonic now = TMonotonic::Now();
114118
for (auto&& i : writtenData.GetWriteResults()) {
115119
i.MutableWriteMeta().OnStage(NEvWrite::EWriteStage::FailWritingToLocalDB);
116-
Counters.OnWritePutBlobsFailed(now - i.GetWriteMeta().GetWriteStartInstant(), i.GetRecordsCount());
120+
if (i.GetWriteMeta().IsBulk()) {
121+
Counters.OnWritePutBulkBlobsFailed(now - i.GetWriteMeta().GetWriteStartInstant(), i.GetRecordsCount());
122+
} else {
123+
Counters.OnWritePutBlobsFailed(now - i.GetWriteMeta().GetWriteStartInstant(), i.GetRecordsCount());
124+
}
117125
Counters.GetCSCounters().OnWritePutBlobsFail(now - i.GetWriteMeta().GetWriteStartInstant());
118126
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", i.GetDataSize())("event", "data_write_error")(
119127
"writing_id", i.GetWriteMeta().GetId())("reason", i.GetErrorMessage());
@@ -477,9 +485,11 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
477485
lockId = record.GetLockTxId();
478486
}
479487

488+
const bool isBulk = operation.HasIsBulk() && operation.GetIsBulk();
489+
480490
Counters.GetWritesMonitor()->OnStartWrite(arrowData->GetSize());
481491
WriteTasksQueue->Enqueue(TWriteTask(
482-
arrowData, schema, source, granuleShardingVersionId, pathId, cookie, lockId, *mType, behaviour, writeTimeout, record.GetTxId()));
492+
arrowData, schema, source, granuleShardingVersionId, pathId, cookie, lockId, *mType, behaviour, writeTimeout, record.GetTxId(), isBulk));
483493
WriteTasksQueue->Drain(false, ctx);
484494
}
485495

ydb/core/tx/columnshard/counters/counters_manager.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,20 @@ class TCountersManager {
9898
CSCounters.OnWritePutBlobsSuccess(d);
9999
}
100100

101+
void OnWritePutBulkBlobsSuccess(const TDuration d, const ui64 rowsWritten) const {
102+
TabletCounters->OnWritePutBulkBlobsSuccess(rowsWritten);
103+
CSCounters.OnWritePutBlobsSuccess(d);
104+
}
105+
101106
void OnWritePutBlobsFailed(const TDuration d, const ui64 /*rowsWritten*/) const {
102107
TabletCounters->OnWriteFailure();
103108
CSCounters.OnWritePutBlobsFail(d);
104109
}
110+
111+
void OnWritePutBulkBlobsFailed(const TDuration d, const ui64 /*rowsWritten*/) const {
112+
TabletCounters->OnWriteFailure();
113+
CSCounters.OnWritePutBlobsFail(d);
114+
}
105115
};
106116

107117
} // namespace NKikimr::NColumnShard

ydb/core/tx/columnshard/counters/tablet_counters.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ class TTabletCountersHandle {
7575
IncCounter(NColumnShard::COUNTER_WRITE_SUCCESS);
7676
}
7777

78+
void OnBulkWriteSuccess(const ui64 blobsWritten, const ui64 bytesWritten) const {
79+
IncCounter(NColumnShard::COUNTER_OPERATIONS_BLOBS_WRITTEN, blobsWritten);
80+
IncCounter(NColumnShard::COUNTER_OPERATIONS_BULK_BYTES_WRITTEN, bytesWritten);
81+
IncCounter(NColumnShard::COUNTER_WRITE_SUCCESS);
82+
}
83+
7884
void OnWriteFailure() const {
7985
IncCounter(NColumnShard::COUNTER_WRITE_FAIL);
8086
}
@@ -96,14 +102,18 @@ class TTabletCountersHandle {
96102
IncCounter(NColumnShard::COUNTER_OPERATIONS_ROWS_WRITTEN, rowsWritten);
97103
}
98104

105+
void OnWritePutBulkBlobsSuccess(const ui64 rowsWritten) const {
106+
IncCounter(NColumnShard::COUNTER_OPERATIONS_BULK_ROWS_WRITTEN, rowsWritten);
107+
}
108+
99109
void OnDropPortionEvent(const ui64 rawBytes, const ui64 blobBytes, const ui64 rows) const {
100110
IncCounter(NColumnShard::COUNTER_RAW_BYTES_ERASED, rawBytes);
101111
IncCounter(NColumnShard::COUNTER_BYTES_ERASED, blobBytes);
102112
IncCounter(NColumnShard::COUNTER_ROWS_ERASED, rows);
103113
}
104114

105115
void FillStats(::NKikimrTableStats::TTableStats& output) const {
106-
output.SetRowUpdates(GetValue(COUNTER_OPERATIONS_ROWS_WRITTEN));
116+
output.SetRowUpdates(GetValue(COUNTER_OPERATIONS_ROWS_WRITTEN) + GetValue(COUNTER_OPERATIONS_BULK_ROWS_WRITTEN));
107117
output.SetRowDeletes(GetValue(COUNTER_ROWS_ERASED));
108118
output.SetRowReads(0); // all reads are range reads
109119
output.SetRangeReadRows(GetValue(COUNTER_READ_INDEX_ROWS));

ydb/core/tx/columnshard/operations/manager.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ bool TOperationsManager::Load(NTabletFlatExecutor::TTransactionContext& txc) {
2929
Y_ABORT_UNLESS(metaProto.ParseFromString(metadata));
3030

3131
auto operation = std::make_shared<TWriteOperation>(TUnifiedPathId{}, writeId, lockId, cookie, status, TInstant::Seconds(createdAtSec),
32-
granuleShardingVersionId, NEvWrite::EModificationType::Upsert);
32+
granuleShardingVersionId, NEvWrite::EModificationType::Upsert, metaProto.GetIsBulk());
3333
operation->FromProto(metaProto);
3434
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "register_operation_on_load")("operation_id", operation->GetWriteId());
3535
AFL_VERIFY(operation->GetStatus() != EOperationStatus::Draft);
@@ -203,10 +203,10 @@ void TOperationsManager::LinkTransactionOnComplete(const ui64 /*lockId*/, const
203203
}
204204

205205
TWriteOperation::TPtr TOperationsManager::CreateWriteOperation(const TUnifiedPathId& pathId, const ui64 lockId, const ui64 cookie,
206-
const std::optional<ui32> granuleShardingVersionId, const NEvWrite::EModificationType mType) {
206+
const std::optional<ui32> granuleShardingVersionId, const NEvWrite::EModificationType mType, const bool isBulk) {
207207
auto writeId = BuildNextOperationWriteId();
208208
auto operation = std::make_shared<TWriteOperation>(pathId, writeId, lockId, cookie, EOperationStatus::Draft, AppData()->TimeProvider->Now(),
209-
granuleShardingVersionId, mType);
209+
granuleShardingVersionId, mType, isBulk);
210210
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "register_operation")("operation_id", operation->GetWriteId())(
211211
"last", LastWriteId);
212212
AFL_VERIFY(Operations.emplace(operation->GetWriteId(), operation).second);

0 commit comments

Comments
 (0)