@@ -22,9 +22,10 @@ using TOutputRowColumnOrder = std::vector<std::pair<EOutputRowItemSource, ui64>>
22
22
23
23
// Design note: Base implementation is optimized for wide channels
24
24
class TInputTransformStreamLookupBase
25
- : public NActors::TActorBootstrapped <TInputTransformStreamLookupBase>
25
+ : public NActors::TActor <TInputTransformStreamLookupBase>
26
26
, public NYql::NDq::IDqComputeActorAsyncInput
27
27
{
28
+ using TActor = NActors::TActor<TInputTransformStreamLookupBase>;
28
29
public:
29
30
TInputTransformStreamLookupBase (
30
31
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
@@ -47,7 +48,8 @@ class TInputTransformStreamLookupBase
47
48
size_t cacheLimit,
48
49
std::chrono::seconds cacheTtl
49
50
)
50
- : Alloc(alloc)
51
+ : TActor(&TInputTransformStreamLookupBase::StateFunc)
52
+ , Alloc(alloc)
51
53
, HolderFactory(holderFactory)
52
54
, TypeEnv(typeEnv)
53
55
, InputIndex(inputIndex)
@@ -86,26 +88,6 @@ class TInputTransformStreamLookupBase
86
88
Free ();
87
89
}
88
90
89
- void Bootstrap () {
90
- Become (&TInputTransformStreamLookupBase::StateFunc);
91
- NDq::IDqAsyncIoFactory::TLookupSourceArguments lookupSourceArgs {
92
- .Alloc = Alloc,
93
- .KeyTypeHelper = KeyTypeHelper,
94
- .ParentId = SelfId (),
95
- .TaskCounters = TaskCounters,
96
- .LookupSource = Settings.GetRightSource ().GetLookupSource (),
97
- .KeyType = LookupKeyType,
98
- .PayloadType = LookupPayloadType,
99
- .TypeEnv = TypeEnv,
100
- .HolderFactory = HolderFactory,
101
- .MaxKeysInRequest = 1000 // TODO configure me
102
- };
103
- auto guard = Guard (*Alloc);
104
- auto [lookupSource, lookupSourceActor] = Factory->CreateDqLookupSource (Settings.GetRightSource ().GetProviderName (), std::move (lookupSourceArgs));
105
- MaxKeysInRequest = lookupSource->GetMaxSupportedKeysInRequest ();
106
- LookupSourceId = RegisterWithSameMailbox (lookupSourceActor);
107
- KeysForLookup = std::make_shared<IDqAsyncLookupSource::TUnboxedValueMap>(MaxKeysInRequest, KeyTypeHelper->GetValueHash (), KeyTypeHelper->GetValueEqual ());
108
- }
109
91
protected:
110
92
virtual NUdf::EFetchStatus FetchWideInputValue (NUdf::TUnboxedValue* inputRowItems) = 0;
111
93
virtual void PushOutputValue (NKikimr::NMiniKQL::TUnboxedValueBatch& batch, NUdf::TUnboxedValue* outputRowItems) = 0;
@@ -199,6 +181,7 @@ class TInputTransformStreamLookupBase
199
181
}
200
182
201
183
void PassAway () final {
184
+ InputFlowFetchStatus = NUdf::EFetchStatus::Finish;
202
185
Send (LookupSourceId, new NActors::TEvents::TEvPoison{});
203
186
Free ();
204
187
}
@@ -225,14 +208,38 @@ class TInputTransformStreamLookupBase
225
208
}
226
209
}
227
210
211
+ std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> GetKeysForLookup () { // must be called with mkql allocator
212
+ if (!KeysForLookup) {
213
+ Y_ENSURE (SelfId ());
214
+ Y_ENSURE (!LookupSourceId);
215
+ NDq::IDqAsyncIoFactory::TLookupSourceArguments lookupSourceArgs {
216
+ .Alloc = Alloc,
217
+ .KeyTypeHelper = KeyTypeHelper,
218
+ .ParentId = SelfId (),
219
+ .TaskCounters = TaskCounters,
220
+ .LookupSource = Settings.GetRightSource ().GetLookupSource (),
221
+ .KeyType = LookupKeyType,
222
+ .PayloadType = LookupPayloadType,
223
+ .TypeEnv = TypeEnv,
224
+ .HolderFactory = HolderFactory,
225
+ .MaxKeysInRequest = 1000 // TODO configure me
226
+ };
227
+ auto [lookupSource, lookupSourceActor] = Factory->CreateDqLookupSource (Settings.GetRightSource ().GetProviderName (), std::move (lookupSourceArgs));
228
+ MaxKeysInRequest = lookupSource->GetMaxSupportedKeysInRequest ();
229
+ LookupSourceId = RegisterWithSameMailbox (lookupSourceActor);
230
+ KeysForLookup = std::make_shared<IDqAsyncLookupSource::TUnboxedValueMap>(MaxKeysInRequest, KeyTypeHelper->GetValueHash (), KeyTypeHelper->GetValueEqual ());
231
+ }
232
+ return KeysForLookup;
233
+ }
234
+
228
235
i64 GetAsyncInputData (NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe<TInstant>&, bool & finished, i64 freeSpace) final {
229
236
Y_UNUSED (freeSpace);
230
237
auto startCycleCount = GetCycleCountFast ();
231
238
auto guard = BindAllocator ();
232
239
233
240
DrainReadyQueue (batch);
234
241
235
- if (InputFlowFetchStatus != NUdf::EFetchStatus::Finish && KeysForLookup ->empty ()) {
242
+ if (InputFlowFetchStatus != NUdf::EFetchStatus::Finish && GetKeysForLookup () ->empty ()) {
236
243
Y_DEBUG_ABORT_UNLESS (AwaitingQueue.empty ());
237
244
NUdf::TUnboxedValue* inputRowItems;
238
245
NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder (InputRowType->GetElementsCount (), inputRowItems);
0 commit comments