Skip to content

Commit 7576700

Browse files
Transfer data from a topic to a table. (#14371)
Co-authored-by: Ilnaz Nizametdinov <i.nizametdinov@gmail.com>
1 parent 67c0c46 commit 7576700

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+1913
-107
lines changed

ydb/core/kqp/runtime/kqp_write_table.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ class TColumnBatch : public IDataBatch {
4444
return result;
4545
}
4646

47+
std::shared_ptr<void> ExtractBatch() override {
48+
return std::dynamic_pointer_cast<void>(Extract());
49+
}
50+
4751
explicit TColumnBatch(const TRecordBatchPtr& data)
4852
: Data(data)
4953
, Memory(NArrow::GetBatchDataSize(Data)) {
@@ -75,6 +79,11 @@ class TRowBatch : public IDataBatch {
7579
return {std::move(Cells), std::move(Data)};
7680
}
7781

82+
std::shared_ptr<void> ExtractBatch() override {
83+
auto r = std::make_shared<std::pair<std::vector<TCell>, std::vector<TCharVectorPtr>>>(std::move(Extract()));
84+
return std::reinterpret_pointer_cast<void>(r);
85+
}
86+
7887
TRowBatch(std::vector<TCell>&& cells, std::vector<TCharVectorPtr>&& data, i64 size, ui32 rows, ui16 columns)
7988
: Cells(std::move(cells))
8089
, Data(std::move(data))

ydb/core/kqp/runtime/kqp_write_table.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ class IDataBatch : public TThrRefBase {
1515
virtual TString SerializeToString() const = 0;
1616
virtual i64 GetMemory() const = 0;
1717
virtual bool IsEmpty() const = 0;
18+
19+
virtual std::shared_ptr<void> ExtractBatch() = 0;
1820
};
1921

2022
using IDataBatchPtr = TIntrusivePtr<IDataBatch>;
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
#include "purecalc.h"
2+
3+
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
4+
#include <yql/essentials/minikql/computation/mkql_custom_list.h>
5+
#include <yql/essentials/minikql/mkql_string_util.h>
6+
#include <yql/essentials/utils/yql_panic.h>
7+
8+
namespace NYdb::NTopic::NPurecalc {
9+
10+
namespace {
11+
12+
using namespace NYql::NUdf;
13+
using namespace NKikimr::NMiniKQL;
14+
15+
constexpr const char* DataFieldName = "_data";
16+
constexpr const char* OffsetFieldName = "_offset";
17+
18+
constexpr const size_t FieldCount = 2; // Change it when change fields
19+
20+
struct FieldPositions {
21+
ui64 Data = 0;
22+
ui64 Offset = 0;
23+
};
24+
25+
26+
NYT::TNode CreateTypeNode(const TString& fieldType) {
27+
return NYT::TNode::CreateList()
28+
.Add("DataType")
29+
.Add(fieldType);
30+
}
31+
32+
void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
33+
node.Add(
34+
NYT::TNode::CreateList()
35+
.Add(fieldName)
36+
.Add(CreateTypeNode(fieldType))
37+
);
38+
}
39+
40+
41+
NYT::TNode CreateMessageScheme() {
42+
auto structMembers = NYT::TNode::CreateList();
43+
AddField(structMembers, DataFieldName, "String");
44+
AddField(structMembers, OffsetFieldName, "Uint64");
45+
46+
return NYT::TNode::CreateList()
47+
.Add("StructType")
48+
.Add(std::move(structMembers));
49+
}
50+
51+
static const TVector<NYT::TNode> InputSchema{ CreateMessageScheme() };
52+
53+
struct TMessageWrapper {
54+
const TMessage& Message;
55+
56+
NYql::NUdf::TUnboxedValuePod GetData() const {
57+
return NKikimr::NMiniKQL::MakeString(Message.Data);
58+
}
59+
60+
NYql::NUdf::TUnboxedValuePod GetOffset() const {
61+
return NYql::NUdf::TUnboxedValuePod(Message.Offset);
62+
}
63+
};
64+
65+
class TInputConverter {
66+
protected:
67+
IWorker* Worker_;
68+
TPlainContainerCache Cache_;
69+
FieldPositions Position;
70+
71+
public:
72+
explicit TInputConverter(IWorker* worker)
73+
: Worker_(worker)
74+
{
75+
const TStructType* structType = worker->GetInputType();
76+
const ui64 count = structType->GetMembersCount();
77+
78+
for (ui64 i = 0; i < count; ++i) {
79+
const auto name = structType->GetMemberName(i);
80+
if (name == DataFieldName) {
81+
Position.Data = i;
82+
} else if (name == OffsetFieldName) {
83+
Position.Offset = i;
84+
}
85+
}
86+
}
87+
88+
public:
89+
void DoConvert(const TMessage* message, TUnboxedValue& result) {
90+
auto& holderFactory = Worker_->GetGraph().GetHolderFactory();
91+
TUnboxedValue* items = nullptr;
92+
result = Cache_.NewArray(holderFactory, static_cast<ui32>(FieldCount), items);
93+
94+
TMessageWrapper wrap {*message};
95+
items[Position.Data] = wrap.GetData();
96+
items[Position.Offset] = wrap.GetOffset();
97+
}
98+
99+
void ClearCache() {
100+
Cache_.Clear();
101+
}
102+
};
103+
104+
/**
105+
* List (or, better, stream) of unboxed values. Used as an input value in pull workers.
106+
*/
107+
class TMessageListValue final: public TCustomListValue {
108+
private:
109+
mutable bool HasIterator_ = false;
110+
THolder<IStream<TMessage*>> Underlying_;
111+
TInputConverter Converter;
112+
IWorker* Worker_;
113+
TScopedAlloc& ScopedAlloc_;
114+
115+
public:
116+
TMessageListValue(
117+
TMemoryUsageInfo* memInfo,
118+
const TMessageInputSpec& /*inputSpec*/,
119+
THolder<IStream<TMessage*>> underlying,
120+
IWorker* worker
121+
)
122+
: TCustomListValue(memInfo)
123+
, Underlying_(std::move(underlying))
124+
, Converter(worker)
125+
, Worker_(worker)
126+
, ScopedAlloc_(Worker_->GetScopedAlloc())
127+
{
128+
}
129+
130+
~TMessageListValue() override {
131+
{
132+
// This list value stored in the worker's computation graph and destroyed upon the computation
133+
// graph's destruction. This brings us to an interesting situation: scoped alloc is acquired,
134+
// worker and computation graph are half-way destroyed, and now it's our turn to die. The problem is,
135+
// the underlying stream may own another worker. This happens when chaining programs. Now, to destroy
136+
// that worker correctly, we need to release our scoped alloc (because that worker has its own
137+
// computation graph and scoped alloc).
138+
// By the way, note that we shouldn't interact with the worker here because worker is in the middle of
139+
// its own destruction. So we're using our own reference to the scoped alloc. That reference is alive
140+
// because scoped alloc destroyed after computation graph.
141+
auto unguard = Unguard(ScopedAlloc_);
142+
Underlying_.Destroy();
143+
}
144+
}
145+
146+
public:
147+
TUnboxedValue GetListIterator() const override {
148+
YQL_ENSURE(!HasIterator_, "Only one pass over input is supported");
149+
HasIterator_ = true;
150+
return TUnboxedValuePod(const_cast<TMessageListValue*>(this));
151+
}
152+
153+
bool Next(TUnboxedValue& result) override {
154+
const TMessage* message;
155+
{
156+
auto unguard = Unguard(ScopedAlloc_);
157+
message = Underlying_->Fetch();
158+
}
159+
160+
if (!message) {
161+
return false;
162+
}
163+
164+
Converter.DoConvert(message, result);
165+
166+
return true;
167+
}
168+
169+
EFetchStatus Fetch(TUnboxedValue& result) override {
170+
if (Next(result)) {
171+
return EFetchStatus::Ok;
172+
} else {
173+
return EFetchStatus::Finish;
174+
}
175+
}
176+
};
177+
178+
class TMessageConsumerImpl final: public IConsumer<TMessage*> {
179+
private:
180+
TWorkerHolder<IPushStreamWorker> WorkerHolder;
181+
TInputConverter Converter;
182+
183+
public:
184+
TMessageConsumerImpl(
185+
const TMessageInputSpec& /*inputSpec*/,
186+
TWorkerHolder<IPushStreamWorker> worker
187+
)
188+
: WorkerHolder(std::move(worker))
189+
, Converter(WorkerHolder.Get())
190+
{
191+
}
192+
193+
~TMessageConsumerImpl() override {
194+
with_lock(WorkerHolder->GetScopedAlloc()) {
195+
Converter.ClearCache();
196+
}
197+
}
198+
199+
public:
200+
void OnObject(TMessage* message) override {
201+
TBindTerminator bind(WorkerHolder->GetGraph().GetTerminator());
202+
203+
with_lock(WorkerHolder->GetScopedAlloc()) {
204+
Y_DEFER {
205+
// Clear cache after each object because
206+
// values allocated on another allocator and should be released
207+
Converter.ClearCache();
208+
WorkerHolder->Invalidate();
209+
};
210+
211+
TUnboxedValue result;
212+
Converter.DoConvert(message, result);
213+
WorkerHolder->Push(std::move(result));
214+
}
215+
}
216+
217+
void OnFinish() override {
218+
TBindTerminator bind(WorkerHolder->GetGraph().GetTerminator());
219+
220+
with_lock(WorkerHolder->GetScopedAlloc()) {
221+
WorkerHolder->OnFinish();
222+
}
223+
}
224+
};
225+
226+
} // namespace
227+
228+
const TVector<NYT::TNode>& TMessageInputSpec::GetSchemas() const {
229+
return InputSchema;
230+
}
231+
232+
} // namespace NYdb::NTopic::NPurecalc
233+
234+
namespace NYql::NPureCalc {
235+
236+
using namespace NYdb::NTopic::NPurecalc;
237+
238+
using ConsumerType = TInputSpecTraits<TMessageInputSpec>::TConsumerType;
239+
240+
void TInputSpecTraits<TMessageInputSpec>::PreparePullStreamWorker(
241+
const TMessageInputSpec& inputSpec,
242+
IPullStreamWorker* worker,
243+
THolder<IStream<TMessage*>> stream
244+
) {
245+
with_lock(worker->GetScopedAlloc()) {
246+
worker->SetInput(
247+
worker->GetGraph().GetHolderFactory().Create<TMessageListValue>(inputSpec, std::move(stream), worker), 0);
248+
}
249+
}
250+
251+
void TInputSpecTraits<TMessageInputSpec>::PreparePullListWorker(
252+
const TMessageInputSpec& inputSpec,
253+
IPullListWorker* worker,
254+
THolder<IStream<TMessage*>> stream
255+
) {
256+
with_lock(worker->GetScopedAlloc()) {
257+
worker->SetInput(
258+
worker->GetGraph().GetHolderFactory().Create<TMessageListValue>(inputSpec, std::move(stream), worker), 0);
259+
}
260+
}
261+
262+
ConsumerType TInputSpecTraits<TMessageInputSpec>::MakeConsumer(
263+
const TMessageInputSpec& inputSpec,
264+
TWorkerHolder<IPushStreamWorker> worker
265+
) {
266+
return MakeHolder<TMessageConsumerImpl>(inputSpec, std::move(worker));
267+
}
268+
269+
} // namespace NYql::NPureCalc
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#pragma once
2+
3+
#include <yql/essentials/public/purecalc/common/interface.h>
4+
5+
namespace NYdb::NTopic::NPurecalc {
6+
7+
using namespace NYql::NPureCalc;
8+
9+
struct TMessage {
10+
TMessage(const TString& data)
11+
: Data(data) {
12+
}
13+
14+
TMessage& WithOffset(ui64 offset) {
15+
Offset = offset;
16+
return *this;
17+
}
18+
19+
const TString& Data;
20+
ui64 Offset = 0;
21+
};
22+
23+
class TMessageInputSpec: public TInputSpecBase {
24+
public:
25+
/**
26+
* Build input spec and associate the given message descriptor.
27+
*/
28+
explicit TMessageInputSpec() = default;
29+
30+
public:
31+
const TVector<NYT::TNode>& GetSchemas() const override;
32+
bool ProvidesBlocks() const override { return false; }
33+
};
34+
35+
}
36+
37+
namespace NYql::NPureCalc {
38+
39+
template<>
40+
struct TInputSpecTraits<NYdb::NTopic::NPurecalc::TMessageInputSpec> {
41+
42+
static const constexpr bool IsPartial = false;
43+
44+
static const constexpr bool SupportPullStreamMode = true;
45+
static const constexpr bool SupportPullListMode = true;
46+
static const constexpr bool SupportPushStreamMode = true;
47+
48+
using TInput = NYdb::NTopic::NPurecalc::TMessage;
49+
using TInputSpecType = NYdb::NTopic::NPurecalc::TMessageInputSpec;
50+
using TConsumerType = THolder<IConsumer<TInput*>>;
51+
52+
static void PreparePullStreamWorker(const TInputSpecType&, IPullStreamWorker*, THolder<IStream<TInput*>>);
53+
static void PreparePullListWorker(const TInputSpecType&, IPullListWorker*, THolder<IStream<TInput*>>);
54+
static TConsumerType MakeConsumer(const TInputSpecType&, TWorkerHolder<IPushStreamWorker>);
55+
};
56+
57+
}

ydb/core/persqueue/purecalc/ya.make

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
purecalc.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/core/fq/libs/row_dispatcher/purecalc_no_pg_wrapper
9+
)
10+
11+
YQL_LAST_ABI_VERSION()
12+
13+
END()

ydb/core/persqueue/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ RECURSE(
8585
config
8686
events
8787
partition_key_range
88+
purecalc
8889
writer
8990
)
9091

0 commit comments

Comments
 (0)