Skip to content

Commit 658b27b

Browse files
speed up logging exp (#9087)
1 parent 42b25d4 commit 658b27b

File tree

22 files changed

+128
-70
lines changed

22 files changed

+128
-70
lines changed

ydb/core/formats/arrow/serializer/abstract.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@ NKikimr::TConclusionStatus TSerializerContainer::DeserializeFromRequest(NYql::TF
2121
return TBase::GetObjectPtr()->DeserializeFromRequest(features);
2222
}
2323

24-
std::shared_ptr<NKikimr::NArrow::NSerialization::ISerializer> TSerializerContainer::GetDefaultSerializer() {
24+
std::shared_ptr<ISerializer> TSerializerContainer::GetDefaultSerializer() {
2525
return std::make_shared<TNativeSerializer>();
2626
}
27+
std::shared_ptr<ISerializer> TSerializerContainer::GetFastestSerializer() {
28+
return std::make_shared<TNativeSerializer>(arrow::Compression::UNCOMPRESSED);
29+
}
30+
2731

2832
}

ydb/core/formats/arrow/serializer/abstract.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ class TSerializerContainer: public NBackgroundTasks::TInterfaceProtoContainer<IS
146146
using TBase::DeserializeFromProto;
147147

148148
static std::shared_ptr<ISerializer> GetDefaultSerializer();
149+
static std::shared_ptr<ISerializer> GetFastestSerializer();
149150

150151
TConclusionStatus DeserializeFromProto(const NKikimrSchemeOp::TCompressionOptions& proto);
151152

ydb/core/formats/arrow/size_calcer.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -242,12 +242,15 @@ ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column) {
242242
}
243243

244244
NKikimr::NArrow::TSerializedBatch TSerializedBatch::Build(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context) {
245-
std::optional<TString> specialKeys;
245+
std::optional<TString> specialKeysPayload;
246+
std::optional<TString> specialKeysFull;
246247
if (context.GetFieldsForSpecialKeys().size()) {
247-
specialKeys = TFirstLastSpecialKeys(batch, context.GetFieldsForSpecialKeys()).SerializeToString();
248+
TFirstLastSpecialKeys specialKeys(batch, context.GetFieldsForSpecialKeys());
249+
specialKeysPayload = specialKeys.SerializePayloadToString();
250+
specialKeysFull = specialKeys.SerializeFullToString();
248251
}
249-
return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(),
250-
NArrow::GetBatchDataSize(batch), specialKeys);
252+
return TSerializedBatch(NArrow::SerializeBatchNoCompression(batch), batch->num_rows(),
253+
NArrow::GetBatchDataSize(batch), specialKeysPayload, specialKeysFull);
251254
}
252255

253256
TConclusionStatus TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR) {
@@ -291,7 +294,7 @@ TConclusion<std::vector<TSerializedBatch>> TSerializedBatch::BuildWithLimit(std:
291294
}
292295

293296
TString TSerializedBatch::DebugString() const {
294-
return TStringBuilder() << "(data_size=" << Data.size() << ";schema_data_size=" << SchemaData.size() << ";rows_count=" << RowsCount << ";raw_bytes=" << RawBytes << ";)";
297+
return TStringBuilder() << "(data_size=" << Data.size() << ";rows_count=" << RowsCount << ";raw_bytes=" << RawBytes << ";)";
295298
}
296299

297300
}

ydb/core/formats/arrow/size_calcer.h

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -70,23 +70,29 @@ class TBatchSplitttingContext {
7070

7171
class TSerializedBatch {
7272
private:
73-
YDB_READONLY_DEF(TString, SchemaData);
7473
YDB_READONLY_DEF(TString, Data);
7574
YDB_READONLY(ui32, RowsCount, 0);
7675
YDB_READONLY(ui32, RawBytes, 0);
77-
std::optional<TString> SpecialKeys;
76+
std::optional<TString> SpecialKeysFull;
77+
std::optional<TString> SpecialKeysPayload;
78+
7879
public:
7980
size_t GetSize() const {
8081
return Data.size();
8182
}
8283

83-
const TString& GetSpecialKeysSafe() const {
84-
AFL_VERIFY(SpecialKeys);
85-
return *SpecialKeys;
84+
const TString& GetSpecialKeysPayloadSafe() const {
85+
AFL_VERIFY(SpecialKeysPayload);
86+
return *SpecialKeysPayload;
87+
}
88+
89+
const TString& GetSpecialKeysFullSafe() const {
90+
AFL_VERIFY(SpecialKeysFull);
91+
return *SpecialKeysFull;
8692
}
8793

8894
bool HasSpecialKeys() const {
89-
return !!SpecialKeys;
95+
return !!SpecialKeysFull;
9096
}
9197

9298
TString DebugString() const;
@@ -95,14 +101,14 @@ class TSerializedBatch {
95101
static TConclusionStatus BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR);
96102
static TSerializedBatch Build(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context);
97103

98-
TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes, const std::optional<TString>& specialKeys)
99-
: SchemaData(schemaData)
100-
, Data(data)
104+
TSerializedBatch(TString&& data, const ui32 rowsCount, const ui32 rawBytes,
105+
const std::optional<TString>& specialKeysPayload, const std::optional<TString>& specialKeysFull)
106+
: Data(data)
101107
, RowsCount(rowsCount)
102108
, RawBytes(rawBytes)
103-
, SpecialKeys(specialKeys)
104-
{
105-
109+
, SpecialKeysFull(specialKeysFull)
110+
, SpecialKeysPayload(specialKeysPayload) {
111+
AFL_VERIFY(!!SpecialKeysPayload == !!SpecialKeysFull);
106112
}
107113
};
108114

ydb/core/formats/arrow/special_keys.cpp

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ NKikimr::NArrow::TReplaceKey TSpecialKeys::GetKeyByIndex(const ui32 position, co
2727
}
2828
}
2929

30-
TString TSpecialKeys::SerializeToString() const {
31-
return NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer()->SerializeFull(Data);
30+
TString TSpecialKeys::SerializePayloadToString() const {
31+
return NArrow::NSerialization::TSerializerContainer::GetFastestSerializer()->SerializePayload(Data);
3232
}
3333

34-
TString TSpecialKeys::SerializeToStringDataOnlyNoCompression() const {
35-
return NArrow::SerializeBatchNoCompression(Data);
34+
TString TSpecialKeys::SerializeFullToString() const {
35+
return NArrow::NSerialization::TSerializerContainer::GetFastestSerializer()->SerializeFull(Data);
3636
}
3737

3838
ui64 TSpecialKeys::GetMemoryBytes() const {
@@ -50,13 +50,17 @@ TFirstLastSpecialKeys::TFirstLastSpecialKeys(const std::shared_ptr<arrow::Record
5050
if (columnNames.size()) {
5151
keyBatch = NArrow::TColumnOperator().VerifyIfAbsent().Extract(batch, columnNames);
5252
}
53-
std::vector<ui64> indexes = {0};
54-
if (batch->num_rows() > 1) {
55-
indexes.emplace_back(batch->num_rows() - 1);
56-
}
53+
if (keyBatch->num_rows() <= 2) {
54+
Data = keyBatch;
55+
} else {
56+
std::vector<ui64> indexes = { 0 };
57+
if (batch->num_rows() > 1) {
58+
indexes.emplace_back(batch->num_rows() - 1);
59+
}
5760

58-
Data = NArrow::CopyRecords(keyBatch, indexes);
59-
Y_ABORT_UNLESS(Data->num_rows() == 1 || Data->num_rows() == 2);
61+
Data = NArrow::CopyRecords(keyBatch, indexes);
62+
Y_ABORT_UNLESS(Data->num_rows() == 1 || Data->num_rows() == 2);
63+
}
6064
}
6165

6266
TMinMaxSpecialKeys::TMinMaxSpecialKeys(std::shared_ptr<arrow::RecordBatch> batch, const std::shared_ptr<arrow::Schema>& schema) {

ydb/core/formats/arrow/special_keys.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ class TSpecialKeys {
2222
public:
2323
ui64 GetMemoryBytes() const;
2424

25-
TString SerializeToStringDataOnlyNoCompression() const;
26-
2725
TSpecialKeys(const TString& data, const std::shared_ptr<arrow::Schema>& schema) {
2826
Data = NArrow::DeserializeBatch(data, schema);
2927
Y_ABORT_UNLESS(Data);
@@ -34,7 +32,8 @@ class TSpecialKeys {
3432
Y_ABORT_UNLESS(DeserializeFromString(data));
3533
}
3634

37-
TString SerializeToString() const;
35+
TString SerializePayloadToString() const;
36+
TString SerializeFullToString() const;
3837
ui64 GetMemorySize() const;
3938
};
4039

ydb/core/protos/tx_columnshard.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ message TLogicalMetadata {
8989
optional string SpecialKeysRawData = 6;
9090
optional TEvWrite.EModificationType ModificationType = 7;
9191
optional NKikimrArrowSchema.TSchemaSubset SchemaSubset = 8;
92+
optional string SpecialKeysPayloadData = 9;
9293
}
9394

9495
message TEvWriteResult {

ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
1010
meta.SetNumRows(batch->GetRowsCount());
1111
meta.SetRawBytes(batch->GetRawBytes());
1212
meta.SetDirtyWriteTimeSeconds(batch.GetStartInstant().Seconds());
13-
meta.SetSpecialKeysRawData(batch->GetSpecialKeysSafe());
13+
meta.SetSpecialKeysRawData(batch->GetSpecialKeysFullSafe());
14+
meta.SetSpecialKeysPayloadData(batch->GetSpecialKeysPayloadSafe());
1415

1516
const auto& blobRange = batch.GetRange();
1617
Y_ABORT_UNLESS(blobRange.GetBlobId().IsValid());

ydb/core/tx/columnshard/columnshard__progress_tx.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
1515
std::optional<NOlap::TSnapshot> LastCompletedTx;
1616
std::optional<TTxController::TPlanQueueItem> PlannedQueueItem;
1717
std::optional<TMonotonic> StartExecution;
18+
const TMonotonic ConstructionInstant = TMonotonic::Now();
1819

1920
public:
2021
TTxProgressTx(TColumnShard* self)
@@ -95,7 +96,8 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
9596
Self->LastCompletedTx = std::max(*LastCompletedTx, Self->LastCompletedTx);
9697
}
9798
if (StartExecution) {
98-
Self->GetProgressTxController().GetCounters().OnTxProgressDuration(TxOperator->GetOpType(), TMonotonic::Now() - *StartExecution);
99+
Self->GetProgressTxController().GetCounters().OnTxExecuteDuration(TxOperator->GetOpType(), TMonotonic::Now() - *StartExecution);
100+
Self->GetProgressTxController().GetCounters().OnTxLiveDuration(TxOperator->GetOpType(), TMonotonic::Now() - ConstructionInstant);
99101
}
100102
Self->SetupIndexation();
101103
}

ydb/core/tx/columnshard/counters/tx_progress.h

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ class TTxProgressCounters: public TCommonCountersOwner {
2424
NMonitoring::TDynamicCounters::TCounterPtr FinishProposeOnComplete;
2525
NMonitoring::TDynamicCounters::TCounterPtr FinishPlannedTx;
2626
NMonitoring::TDynamicCounters::TCounterPtr AbortTx;
27-
NMonitoring::THistogramPtr HistogramTxProgressDuration;
27+
NMonitoring::THistogramPtr HistogramTxExecuteDuration;
28+
NMonitoring::THistogramPtr HistogramTxLiveDuration;
2829
NMonitoring::THistogramPtr HistogramTxProgressLag;
2930

3031
TProgressCounters(const TCommonCountersOwner& owner)
@@ -37,16 +38,21 @@ class TTxProgressCounters: public TCommonCountersOwner {
3738
, FinishProposeOnComplete(TBase::GetDeriviative("FinishProposeOnComplete"))
3839
, FinishPlannedTx(TBase::GetDeriviative("FinishPlannedTx"))
3940
, AbortTx(TBase::GetDeriviative("AbortTx"))
40-
, HistogramTxProgressDuration(TBase::GetHistogram("TxProgress/Execution/DurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)))
41+
, HistogramTxExecuteDuration(TBase::GetHistogram("TxProgress/Execution/DurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)))
42+
, HistogramTxLiveDuration(TBase::GetHistogram("TxProgress/Live/DurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)))
4143
, HistogramTxProgressLag(TBase::GetHistogram("TxProgress/LagOnComplete/DurationMs", NMonitoring::ExponentialHistogram(18, 2, 5))) {
4244
}
4345
};
4446

4547
THashMap<TOpType, TProgressCounters> CountersByOpType;
4648

4749
public:
48-
void OnTxProgressDuration(const TString& opType, const TDuration d) {
49-
GetSubGroup(opType).HistogramTxProgressDuration->Collect(d.MilliSeconds());
50+
void OnTxExecuteDuration(const TString& opType, const TDuration d) {
51+
GetSubGroup(opType).HistogramTxExecuteDuration->Collect(d.MilliSeconds());
52+
}
53+
54+
void OnTxLiveDuration(const TString& opType, const TDuration d) {
55+
GetSubGroup(opType).HistogramTxLiveDuration->Collect(d.MilliSeconds());
5056
}
5157

5258
void OnTxProgressLag(const TString& opType, const TDuration d) {

0 commit comments

Comments
 (0)