Skip to content

Commit c5d3c1a

Browse files
authored
[RFC] Add checkpoint support for streamlookup (#9299)
1 parent dcbd0e0 commit c5d3c1a

File tree

5 files changed

+36
-10
lines changed

5 files changed

+36
-10
lines changed

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1402,8 +1402,20 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
14021402
}
14031403

14041404
void PollAsyncInput() {
1405+
if (!Running) {
1406+
CA_LOG_T("Skip polling inputs and sources because not running");
1407+
return;
1408+
}
1409+
1410+
CA_LOG_T("Poll inputs");
1411+
for (auto& [inputIndex, transform] : InputTransformsMap) {
1412+
if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
1413+
ContinueExecute(*resume);
1414+
}
1415+
}
1416+
14051417
// Don't produce any input from sources if we're about to save checkpoint.
1406-
if (!Running || (Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved())) {
1418+
if ((Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved())) {
14071419
CA_LOG_T("Skip polling sources because of pending checkpoint");
14081420
return;
14091421
}
@@ -1414,13 +1426,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
14141426
ContinueExecute(*resume);
14151427
}
14161428
}
1417-
1418-
CA_LOG_T("Poll inputs");
1419-
for (auto& [inputIndex, transform] : InputTransformsMap) {
1420-
if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
1421-
ContinueExecute(*resume);
1422-
}
1423-
}
14241429
}
14251430

14261431
void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) {

ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ class TInputTransformStreamLookupBase
184184
}
185185
}
186186
finished = IsFinished();
187-
return 0;
187+
return AwaitingQueue.RowCount();
188188
}
189189

190190
TMaybe<google::protobuf::Any> ExtraData() override {

ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,18 @@ class TLocalTaskRunnerActor
144144
return false;
145145
}
146146
}
147+
for (const auto transformId: InputTransforms) {
148+
const auto t = TaskRunner->GetInputTransform(transformId);
149+
if (t) {
150+
auto [_, transform] = *t;
151+
if (!transform->Empty()) {
152+
return false;
153+
}
154+
if (transform->IsPending()) {
155+
return false;
156+
}
157+
}
158+
}
147159
return true;
148160
}
149161

@@ -443,6 +455,7 @@ class TLocalTaskRunnerActor
443455
for (auto i = 0; i != inputs.size(); ++i) {
444456
if (auto t = TaskRunner->GetInputTransform(i)) {
445457
inputTransforms[i] = *t;
458+
InputTransforms.emplace(i);
446459
}
447460
}
448461

@@ -490,6 +503,7 @@ class TLocalTaskRunnerActor
490503
const TTxId TxId;
491504
const ui64 TaskId;
492505
THashSet<ui32> Inputs;
506+
THashSet<ui32> InputTransforms;
493507
THashSet<ui32> Sources;
494508
TIntrusivePtr<NDq::IDqTaskRunner> TaskRunner;
495509
THashSet<ui32> InputChannelsWithDisabledCheckpoints;

ydb/library/yql/dq/runtime/dq_async_input.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ namespace NYql::NDq {
66
class TDqAsyncInputBuffer : public TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInputBuffer> {
77
using TBaseImpl = TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInputBuffer>;
88
friend TBaseImpl;
9+
bool Pending = false;
910
public:
1011
TDqAsyncInputBufferStats PushStats;
1112
TDqInputStats PopStats;
@@ -32,7 +33,7 @@ class TDqAsyncInputBuffer : public TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInp
3233
}
3334

3435
void Push(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) override {
35-
Y_ABORT_UNLESS(!batch.empty() || !space);
36+
Pending = space != 0;
3637
if (!batch.empty()) {
3738
AddBatch(std::move(batch), space);
3839
}
@@ -41,6 +42,10 @@ class TDqAsyncInputBuffer : public TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInp
4142
virtual void Push(TDqSerializedBatch&&, i64) override {
4243
YQL_ENSURE(!"Unimplemented");
4344
}
45+
46+
bool IsPending() const override {
47+
return Pending;
48+
}
4449
};
4550

4651
IDqAsyncInputBuffer::TPtr CreateDqAsyncInputBuffer(

ydb/library/yql/dq/runtime/dq_async_input.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ class IDqAsyncInputBuffer : public IDqInput {
2323
virtual void Push(TDqSerializedBatch&& batch, i64 space) = 0;
2424

2525
virtual void Finish() = 0;
26+
27+
virtual bool IsPending() const { return false; };
2628
};
2729

2830
IDqAsyncInputBuffer::TPtr CreateDqAsyncInputBuffer(ui64 inputIndex, const TString& type, NKikimr::NMiniKQL::TType* inputType, ui64 maxBufferBytes,

0 commit comments

Comments
 (0)