Skip to content

Commit a0078aa

Browse files
authored
YQ-3753 Shared reading: metadatafields (#10370)
1 parent 5a2bffd commit a0078aa

File tree

4 files changed

+50
-18
lines changed

4 files changed

+50
-18
lines changed

ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
165165
};
166166

167167
TMap<ui64, SessionInfo> Sessions;
168+
const THolderFactory& HolderFactory;
168169

169170
public:
170171
TDqPqRdReadActor(
@@ -239,7 +240,7 @@ TDqPqRdReadActor::TDqPqRdReadActor(
239240
TCollectStatsLevel statsLevel,
240241
const TTxId& txId,
241242
ui64 taskId,
242-
const THolderFactory& /*holderFactory*/,
243+
const THolderFactory& holderFactory,
243244
NPq::NProto::TDqPqTopicSource&& sourceParams,
244245
NPq::NProto::TDqReadTaskParams&& readParams,
245246
const NActors::TActorId& computeActorId,
@@ -251,6 +252,7 @@ TDqPqRdReadActor::TDqPqRdReadActor(
251252
, Token(token)
252253
, LocalRowDispatcherActorId(localRowDispatcherActorId)
253254
, Metrics(txId, taskId, counters)
255+
, HolderFactory(holderFactory)
254256
{
255257
MetadataFields.reserve(SourceParams.MetadataFieldsSize());
256258
TPqMetaExtractor fieldsExtractor;
@@ -259,7 +261,7 @@ TDqPqRdReadActor::TDqPqRdReadActor(
259261
}
260262

261263
IngressStats.Level = statsLevel;
262-
SRC_LOG_D("Start read actor, local row dispatcher " << LocalRowDispatcherActorId.ToString());
264+
SRC_LOG_D("Start read actor, local row dispatcher " << LocalRowDispatcherActorId.ToString() << ", metadatafields: " << JoinSeq(',', SourceParams.GetMetadataFields()));
263265
}
264266

265267
void TDqPqRdReadActor::ProcessState() {
@@ -649,8 +651,21 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev)
649651
std::pair<NUdf::TUnboxedValuePod, i64> TDqPqRdReadActor::CreateItem(const TString& data) {
650652
i64 usedSpace = 0;
651653
NUdf::TUnboxedValuePod item;
652-
item = NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(data.data(), data.size()));
654+
if (MetadataFields.empty()) {
655+
item = NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(data.data(), data.size()));
656+
usedSpace += data.size();
657+
return std::make_pair(item, usedSpace);
658+
}
659+
660+
NUdf::TUnboxedValue* itemPtr;
661+
item = HolderFactory.CreateDirectArrayHolder(MetadataFields.size() + 1, itemPtr);
662+
*(itemPtr++) = NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(data.data(), data.size()));
653663
usedSpace += data.size();
664+
665+
for ([[maybe_unused]] const auto& [name, extractor] : MetadataFields) {
666+
auto ub = NYql::NUdf::TUnboxedValuePod(0); // TODO: use real values
667+
*(itemPtr++) = std::move(ub);
668+
}
654669
return std::make_pair(item, usedSpace);
655670
}
656671

ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ struct TFixture : public TPqIoTestFixture {
2222
}
2323

2424
void InitRdSource(
25-
NYql::NPq::NProto::TDqPqTopicSource&& settings,
25+
const NYql::NPq::NProto::TDqPqTopicSource& settings,
2626
i64 freeSpace = 1_MB)
2727
{
2828
CaSetup->Execute([&](TFakeActor& actor) {
@@ -38,8 +38,9 @@ struct TFixture : public TPqIoTestFixture {
3838
const THashMap<TString, TString> secureParams;
3939
const THashMap<TString, TString> taskParams { {"pq", serializedParams} };
4040

41+
NYql::NPq::NProto::TDqPqTopicSource copySettings = settings;
4142
auto [dqSource, dqSourceAsActor] = CreateDqPqRdReadActor(
42-
std::move(settings),
43+
std::move(copySettings),
4344
0,
4445
NYql::NDq::TCollectStatsLevel::None,
4546
"query_1",
@@ -191,8 +192,8 @@ struct TFixture : public TPqIoTestFixture {
191192
}
192193

193194

194-
void StartSession() {
195-
InitRdSource(BuildPqTopicSourceSettings("topicName"));
195+
void StartSession(NYql::NPq::NProto::TDqPqTopicSource& settings) {
196+
InitRdSource(settings);
196197
SourceRead<TString>(UVParser);
197198
ExpectCoordinatorChangesSubscribe();
198199

@@ -204,13 +205,14 @@ struct TFixture : public TPqIoTestFixture {
204205
MockAck(RowDispatcher1);
205206
}
206207

207-
void ProcessSomeJsons(ui64 offset, const std::vector<TString>& jsons, NActors::TActorId rowDispatcherId) {
208+
void ProcessSomeJsons(ui64 offset, const std::vector<TString>& jsons, NActors::TActorId rowDispatcherId,
209+
std::function<std::vector<TString>(const NUdf::TUnboxedValue&)> uvParser = UVParser) {
208210
MockNewDataArrived(rowDispatcherId);
209211
ExpectGetNextBatch(rowDispatcherId);
210212

211213
MockMessageBatch(offset, jsons, rowDispatcherId);
212214

213-
auto result = SourceReadDataUntil<TString>(UVParser, jsons.size());
215+
auto result = SourceReadDataUntil<TString>(uvParser, jsons.size());
214216
AssertDataWithWatermarks(result, jsons, {});
215217
}
216218

@@ -219,6 +221,8 @@ struct TFixture : public TPqIoTestFixture {
219221
const TString Json3 = "{\"dt\":300,\"value\":\"value3\"}";
220222
const TString Json4 = "{\"dt\":400,\"value\":\"value4\"}";
221223

224+
NYql::NPq::NProto::TDqPqTopicSource Source1 = BuildPqTopicSourceSettings("topicName");
225+
222226
NActors::TActorId LocalRowDispatcherId;
223227
NActors::TActorId Coordinator1Id;
224228
NActors::TActorId Coordinator2Id;
@@ -228,12 +232,12 @@ struct TFixture : public TPqIoTestFixture {
228232

229233
Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
230234
Y_UNIT_TEST_F(TestReadFromTopic, TFixture) {
231-
StartSession();
235+
StartSession(Source1);
232236
ProcessSomeJsons(0, {Json1, Json2}, RowDispatcher1);
233237
}
234238

235239
Y_UNIT_TEST_F(SessionError, TFixture) {
236-
StartSession();
240+
StartSession(Source1);
237241

238242
TInstant deadline = Now() + TDuration::Seconds(5);
239243
auto future = CaSetup->AsyncInputPromises.FatalError.GetFuture();
@@ -252,7 +256,7 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
252256
}
253257

254258
Y_UNIT_TEST_F(ReadWithFreeSpace, TFixture) {
255-
StartSession();
259+
StartSession(Source1);
256260

257261
MockNewDataArrived(RowDispatcher1);
258262
ExpectGetNextBatch(RowDispatcher1);
@@ -275,15 +279,15 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
275279

276280
{
277281
TFixture f;
278-
f.StartSession();
282+
f.StartSession(f.Source1);
279283
f.ProcessSomeJsons(0, {f.Json1, f.Json2}, f.RowDispatcher1); // offsets: 0, 1
280284

281285
f.SaveSourceState(CreateCheckpoint(), state);
282286
Cerr << "State saved" << Endl;
283287
}
284288
{
285289
TFixture f;
286-
f.InitRdSource(BuildPqTopicSourceSettings("topicName"));
290+
f.InitRdSource(f.Source1);
287291
f.SourceRead<TString>(UVParser);
288292
f.LoadSource(state);
289293
f.SourceRead<TString>(UVParser);
@@ -303,7 +307,7 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
303307
}
304308
{
305309
TFixture f;
306-
f.InitRdSource(BuildPqTopicSourceSettings("topicName"));
310+
f.InitRdSource(f.Source1);
307311
f.SourceRead<TString>(UVParser);
308312
f.LoadSource(state);
309313
f.SourceRead<TString>(UVParser);
@@ -321,7 +325,7 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
321325
}
322326

323327
Y_UNIT_TEST_F(CoordinatorChanged, TFixture) {
324-
StartSession();
328+
StartSession(Source1);
325329
ProcessSomeJsons(0, {Json1, Json2}, RowDispatcher1);
326330
MockMessageBatch(2, {Json3}, RowDispatcher1);
327331

@@ -342,7 +346,7 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
342346
}
343347

344348
Y_UNIT_TEST_F(RowDispatcherIsRestarted, TFixture) {
345-
StartSession();
349+
StartSession(Source1);
346350
ProcessSomeJsons(0, {Json1, Json2}, RowDispatcher1);
347351
MockDisconnected();
348352
MockConnected();
@@ -361,5 +365,11 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
361365
MockCoordinatorChanged(Coordinator2Id);
362366
MockSessionError();
363367
}
364-
}
368+
369+
Y_UNIT_TEST_F(MetadataFields, TFixture) {
370+
auto source = BuildPqTopicSourceSettings("topicName");
371+
source.AddMetadataFields("_yql_sys_create_time");
372+
StartSession(source);
373+
ProcessSomeJsons(0, {Json1}, RowDispatcher1, UVParserWithMetadatafields);
374+
}
365375
} // NYql::NDq

ydb/tests/fq/pq_async_io/ut_helpers.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,12 @@ std::vector<TString> UVParser(const NUdf::TUnboxedValue& item) {
243243
return { TString(item.AsStringRef()) };
244244
}
245245

246+
std::vector<TString> UVParserWithMetadatafields(const NUdf::TUnboxedValue& item) {
247+
const auto& cell = item.GetElement(0);
248+
TString str(cell.AsStringRef());
249+
return {str};
250+
}
251+
246252
void TPqIoTestFixture::AsyncOutputWrite(std::vector<TString> data, TMaybe<NDqProto::TCheckpoint> checkpoint) {
247253
CaSetup->AsyncOutputWrite([data](NKikimr::NMiniKQL::THolderFactory& factory) {
248254
NKikimr::NMiniKQL::TUnboxedValueBatch batch;

ydb/tests/fq/pq_async_io/ut_helpers.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,5 +125,6 @@ void AddReadRule(
125125
const TString& streamName);
126126

127127
std::vector<TString> UVParser(const NUdf::TUnboxedValue& item);
128+
std::vector<TString> UVParserWithMetadatafields(const NUdf::TUnboxedValue& item);
128129

129130
}

0 commit comments

Comments
 (0)