Skip to content

Commit 6aec996

Browse files
committed
YQL-19429: Fix DQ Writer
YQL-19429: Fix DQ Writer commit_hash:12a44b5b3da377353b0e6452e7dfa143114cd330
1 parent 05e68c7 commit 6aec996

File tree

1 file changed

+81
-82
lines changed

1 file changed

+81
-82
lines changed

yt/yql/providers/yt/comp_nodes/dq/dq_yt_writer.cpp

Lines changed: 81 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,14 @@ class TYtDqWideWriteWrapper final : public TStatefulFlowCodegeneratorNode<TYtDqW
2828
ITransactionPtr&& transaction,
2929
THolder<TMkqlIOSpecs>&& specs,
3030
TRawTableWriterPtr&& outStream,
31-
THolder<TMkqlWriterImpl>&& writer
31+
THolder<TMkqlWriterImpl>&& writer,
32+
size_t width,
33+
IComputationWideFlowNode* flow
3234
)
3335
: TComputationValue<TWriterState>(memInfo)
34-
, Client(std::move(client)), Transaction(std::move(transaction))
35-
, Specs(std::move(specs)), OutStream(std::move(outStream)), Writer(std::move(writer))
36+
, Client_(std::move(client)), Transaction_(std::move(transaction))
37+
, Specs_(std::move(specs)), OutStream_(std::move(outStream)), Writer_(std::move(writer))
38+
, Values_(width), Fields_(GetPointers(Values_)), Flow_(flow)
3639
{}
3740

3841
~TWriterState() override {
@@ -42,24 +45,37 @@ class TYtDqWideWriteWrapper final : public TStatefulFlowCodegeneratorNode<TYtDqW
4245
}
4346
}
4447

45-
void AddRow(const NUdf::TUnboxedValuePod* row) const {
46-
Writer->AddFlatRow(row);
48+
EFetchResult FetchRead(TComputationContext& ctx) {
49+
switch(Flow_->FetchValues(ctx, Fields_.data())) {
50+
case EFetchResult::One:
51+
Writer_->AddFlatRow(Values_.data());
52+
case EFetchResult::Yield:
53+
return EFetchResult::Yield;
54+
case EFetchResult::Finish:
55+
Finish();
56+
return EFetchResult::Finish;
57+
}
4758
}
4859

4960
void Finish() {
50-
if (!Finished) {
51-
Writer->Finish();
52-
OutStream->Finish();
61+
if (!Finished_) {
62+
Writer_->Finish();
63+
OutStream_->Finish();
64+
Values_.clear();
65+
Fields_.clear();
5366
}
54-
Finished = true;
67+
Finished_ = true;
5568
}
5669
private:
57-
bool Finished = false;
58-
const IClientPtr Client;
59-
const ITransactionPtr Transaction;
60-
const THolder<TMkqlIOSpecs> Specs;
61-
const TRawTableWriterPtr OutStream;
62-
const THolder<TMkqlWriterImpl> Writer;
70+
bool Finished_ = false;
71+
const IClientPtr Client_;
72+
const ITransactionPtr Transaction_;
73+
const THolder<TMkqlIOSpecs> Specs_;
74+
const TRawTableWriterPtr OutStream_;
75+
const THolder<TMkqlWriterImpl> Writer_;
76+
std::vector<NUdf::TUnboxedValue> Values_;
77+
std::vector<NUdf::TUnboxedValue*> Fields_;
78+
IComputationWideFlowNode* Flow_;
6379
};
6480

6581
public:
@@ -82,117 +98,100 @@ class TYtDqWideWriteWrapper final : public TStatefulFlowCodegeneratorNode<TYtDqW
8298
, OutSpec(outSpec)
8399
, WriterOptions(writerOptions)
84100
, CodecCtx(std::move(codecCtx))
85-
, Values(Representations.size()), Fields(GetPointers(Values))
86101
{}
87102

88103
NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
89104
if (state.IsFinish()) {
90105
return state;
91-
} else if (state.IsInvalid())
106+
} else if (state.IsInvalid()) {
92107
MakeState(ctx, state);
93-
94-
switch (const auto ptr = static_cast<TWriterState*>(state.AsBoxed().Get()); Flow->FetchValues(ctx, Fields.data())) {
108+
}
109+
auto result = static_cast<TWriterState*>(state.AsBoxed().Get())->FetchRead(ctx);
110+
switch (result) {
95111
case EFetchResult::One:
96-
ptr->AddRow(Values.data());
97112
return NUdf::TUnboxedValuePod::Void();
98113
case EFetchResult::Yield:
99114
return NUdf::TUnboxedValuePod::MakeYield();
100115
case EFetchResult::Finish:
101-
ptr->Finish();
102116
state = NUdf::TUnboxedValuePod::MakeFinish();
103117
return state;
104118
}
105119
}
120+
106121
#ifndef MKQL_DISABLE_CODEGEN
107122
Value* DoGenerateGetValue(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
108123
auto& context = ctx.Codegen.GetContext();
109124

110125
const auto valueType = Type::getInt128Ty(context);
111-
const auto indexType = Type::getInt32Ty(context);
112126
const auto structPtrType = PointerType::getUnqual(StructType::get(context));
113-
const auto arrayType = ArrayType::get(valueType, Representations.size());
114-
const auto values = new AllocaInst(arrayType, 0U, "values", &ctx.Func->getEntryBlock().back());
115127

116128
const auto init = BasicBlock::Create(context, "init", ctx.Func);
117129
const auto next = BasicBlock::Create(context, "next", ctx.Func);
118-
const auto work = BasicBlock::Create(context, "work", ctx.Func);
130+
const auto returnOne = BasicBlock::Create(context, "returnOne", ctx.Func);
131+
const auto returnYield = BasicBlock::Create(context, "returnYield", ctx.Func);
132+
const auto returnFinish = BasicBlock::Create(context, "returnFinish", ctx.Func);
119133
const auto done = BasicBlock::Create(context, "done", ctx.Func);
120134
const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
121135

122-
const auto output = PHINode::Create(valueType, 4U, "output", exit);
123-
output->addIncoming(GetFinish(context), block);
124-
136+
const auto output = PHINode::Create(valueType, 2U, "output", exit);
125137
const auto check = new LoadInst(valueType, statePtr, "check", block);
126138
const auto choise = SwitchInst::Create(check, next, 2U, block);
139+
// if (state.IsFinish()) => goto returnFinish
140+
choise->addCase(GetFinish(context), returnFinish);
141+
// if (state.IsInvalid()) => goto MakeState(ctx, state)
127142
choise->addCase(GetInvalid(context), init);
128-
choise->addCase(GetFinish(context), exit);
129-
130-
block = init;
131-
132-
const auto ptrType = PointerType::getUnqual(StructType::get(context));
133-
const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
134-
const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr<&TYtDqWideWriteWrapper::MakeState>());
135-
const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
136-
const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block);
137-
CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
138-
139-
BranchInst::Create(next, block);
140-
141-
block = next;
142-
143-
const auto result = GetNodeValues(Flow, ctx, block);
144-
145-
output->addIncoming(GetYield(context), block);
146-
147-
const auto way = SwitchInst::Create(result.first, work, 2U, block);
148-
way->addCase(ConstantInt::get(indexType, static_cast<i32>(EFetchResult::Yield)), exit);
149-
way->addCase(ConstantInt::get(indexType, static_cast<i32>(EFetchResult::Finish)), done);
150-
143+
// Calling MakeState
151144
{
152-
block = work;
153-
154-
TSmallVec<Value*> fields;
155-
fields.reserve(Representations.size());
156-
for (ui32 i = 0U; i < Representations.size(); ++i) {
157-
const auto pointer = GetElementPtrInst::CreateInBounds(arrayType, values, {ConstantInt::get(indexType, 0), ConstantInt::get(indexType, i)}, (TString("ptr_") += ToString(i)).c_str(), block);
158-
fields.emplace_back(result.second[i](ctx, block));
159-
new StoreInst(fields.back(), pointer, block);
160-
}
145+
block = init;
146+
147+
const auto ptrType = PointerType::getUnqual(StructType::get(context));
148+
const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
149+
const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr<&TYtDqWideWriteWrapper::MakeState>());
150+
const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
151+
const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "make_state", block);
152+
CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
153+
BranchInst::Create(next, block);
154+
}
161155

156+
{
157+
// Calling state->FetchRead()
158+
block = next;
162159
const auto state = new LoadInst(valueType, statePtr, "state", block);
163160
const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
164161
const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, structPtrType, "state_arg", block);
165162

166-
const auto addFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr<&TWriterState::AddRow>());
167-
const auto addType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType(), values->getType()}, false);
168-
const auto addPtr = CastInst::Create(Instruction::IntToPtr, addFunc, PointerType::getUnqual(addType), "write", block);
169-
CallInst::Create(addType, addPtr, {stateArg, values}, "", block);
163+
const auto statusType = Type::getInt32Ty(context);
170164

171-
for (ui32 i = 0U; i < Representations.size(); ++i) {
172-
ValueCleanup(Representations[i], fields[i], ctx, block);
173-
}
165+
const auto fetchFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr<&TWriterState::FetchRead>());
166+
const auto fetchType = FunctionType::get(statusType, {stateArg->getType(), ctx.Ctx->getType()}, false);
167+
const auto fetchFunPtr = CastInst::Create(Instruction::IntToPtr, fetchFunc, PointerType::getUnqual(fetchType), "fetch_function", block);
168+
const auto fetchResult = CallInst::Create(fetchType, fetchFunPtr, {stateArg, ctx.Ctx}, "call_fetch_fun", block);
174169

170+
const auto way = SwitchInst::Create(fetchResult, returnOne, 2U, block);
171+
way->addCase(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::Yield)), returnYield);
172+
way->addCase(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::Finish)), done);
173+
}
174+
{
175+
block = returnOne;
175176
output->addIncoming(GetFalse(context), block);
176177
BranchInst::Create(exit, block);
177178
}
178-
179+
{
180+
block = returnYield;
181+
output->addIncoming(GetYield(context), block);
182+
BranchInst::Create(exit, block);
183+
}
184+
{
185+
block = returnFinish;
186+
output->addIncoming(GetFinish(context), block);
187+
BranchInst::Create(exit, block);
188+
}
179189
{
180190
block = done;
181-
182-
const auto state = new LoadInst(valueType, statePtr, "state", block);
183-
const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
184-
const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, structPtrType, "state_arg", block);
185-
186-
const auto finishFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr<&TWriterState::Finish>());
187-
const auto finishType = FunctionType::get(Type::getVoidTy(context), {stateArg->getType()}, false);
188-
const auto finishPtr = CastInst::Create(Instruction::IntToPtr, finishFunc, PointerType::getUnqual(finishType), "finish", block);
189-
CallInst::Create(finishType, finishPtr, {stateArg}, "", block);
190-
191+
// state = MakeFinish()
191192
UnRefBoxed(statePtr, ctx, block);
192193
new StoreInst(GetFinish(context), statePtr, block);
193-
194-
output->addIncoming(GetFinish(context), block);
195-
BranchInst::Create(exit, block);
194+
BranchInst::Create(returnFinish, block);
196195
}
197196

198197
block = exit;
@@ -225,7 +224,7 @@ class TYtDqWideWriteWrapper final : public TStatefulFlowCodegeneratorNode<TYtDqW
225224
auto writer = MakeHolder<TMkqlWriterImpl>(outStream, 4_MB);
226225
writer->SetSpecs(*specs);
227226

228-
state = ctx.HolderFactory.Create<TWriterState>(std::move(client), std::move(transaction), std::move(specs), std::move(outStream), std::move(writer));
227+
state = ctx.HolderFactory.Create<TWriterState>(std::move(client), std::move(transaction), std::move(specs), std::move(outStream), std::move(writer), Representations.size(), Flow);
229228
}
230229

231230
void RegisterDependencies() const final {

0 commit comments

Comments
 (0)