Skip to content

Commit 6ea5cd0

Browse files
authored
Memory tracking for sinks (#16102)
1 parent f081b8b commit 6ea5cd0

File tree

15 files changed

+712
-387
lines changed

15 files changed

+712
-387
lines changed

.github/config/muted_ya.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ ydb/core/kqp/ut/olap KqpOlapWrite.TierDraftsGCWithRestart
2525
ydb/core/kqp/ut/olap [*/*] chunk chunk
2626
ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable+ColumnStore
2727
ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable-ColumnStore
28-
ydb/core/kqp/ut/query KqpLimits.OutOfSpaceYQLUpsertFail+useSink
29-
ydb/core/kqp/ut/query KqpLimits.QSReplySizeEnsureMemoryLimits+useSink
3028
ydb/core/kqp/ut/query KqpStats.SysViewClientLost
3129
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
3230
ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication

ydb/core/formats/arrow/arrow_batch_builder.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,13 @@ bool TRecordBatchReader::DeserializeFromStrings(const TString& schemaString, con
183183
return true;
184184
}
185185

186-
TArrowBatchBuilder::TArrowBatchBuilder(arrow::Compression::type codec, const std::set<std::string>& notNullColumns)
186+
TArrowBatchBuilder::TArrowBatchBuilder(
187+
arrow::Compression::type codec,
188+
const std::set<std::string>& notNullColumns,
189+
arrow::MemoryPool* memoryPool)
187190
: WriteOptions(arrow::ipc::IpcWriteOptions::Defaults())
188191
, NotNullColumns(notNullColumns)
192+
, MemoryPool(memoryPool)
189193
{
190194
Y_ABORT_UNLESS(arrow::util::Codec::IsAvailable(codec));
191195
auto resCodec = arrow::util::Codec::Create(codec);
@@ -201,7 +205,7 @@ arrow::Status TArrowBatchBuilder::Start(const std::vector<std::pair<TString, NSc
201205
if (!schema.ok()) {
202206
return arrow::Status::FromArgs(schema.status().code(), "Cannot make arrow schema: ", schema.status().ToString());
203207
}
204-
auto status = arrow::RecordBatchBuilder::Make(*schema, arrow::default_memory_pool(), RowsToReserve, &BatchBuilder);
208+
auto status = arrow::RecordBatchBuilder::Make(*schema, MemoryPool, RowsToReserve, &BatchBuilder);
205209
NumRows = NumBytes = 0;
206210
if (!status.ok()) {
207211
return arrow::Status::FromArgs(schema.status().code(), "Cannot make arrow builder: ", status.ToString());

ydb/core/formats/arrow/arrow_batch_builder.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,10 @@ class TArrowBatchBuilder : public NKikimr::IBlockBuilder {
149149

150150
/// @note compression is disabled by default KIKIMR-11690
151151
// Allowed codecs: UNCOMPRESSED, LZ4_FRAME, ZSTD
152-
TArrowBatchBuilder(arrow::Compression::type codec = arrow::Compression::UNCOMPRESSED, const std::set<std::string>& notNullColumns = {});
152+
TArrowBatchBuilder(
153+
arrow::Compression::type codec = arrow::Compression::UNCOMPRESSED,
154+
const std::set<std::string>& notNullColumns = {},
155+
arrow::MemoryPool* memoryPool = arrow::default_memory_pool());
153156
~TArrowBatchBuilder() = default;
154157

155158
bool Start(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& columns,
@@ -197,6 +200,7 @@ class TArrowBatchBuilder : public NKikimr::IBlockBuilder {
197200
std::shared_ptr<arrow::RecordBatch> Batch;
198201
size_t RowsToReserve{DEFAULT_ROWS_TO_RESERVE};
199202
const std::set<std::string> NotNullColumns;
203+
arrow::MemoryPool* MemoryPool;
200204

201205
protected:
202206
size_t NumRows{0};

ydb/core/kqp/common/buffer/buffer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ struct TKqpBufferWriterSettings {
1212
NWilson::TTraceId TraceId;
1313
TIntrusivePtr<TKqpCounters> Counters;
1414
TIntrusivePtr<NTxProxy::TTxProxyMon> TxProxyMon;
15+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
1516
};
1617

1718
NActors::IActor* CreateKqpBufferWriterActor(TKqpBufferWriterSettings&& settings);

0 commit comments

Comments
 (0)