Skip to content

Commit 851dc32

Browse files
committed
Fixed pq async io tests
1 parent 1803581 commit 851dc32

File tree

4 files changed

+111
-55
lines changed

4 files changed

+111
-55
lines changed

ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ TDqPqRdReadActor::TDqPqRdReadActor(
285285
const IFunctionRegistry& functionRegistry = *holderFactory.GetFunctionRegistry();
286286
const auto pb = std::make_unique<TProgramBuilder>(typeEnv, functionRegistry);
287287
const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(SourceParams.GetRowType()), *pb, Cerr);
288+
YQL_ENSURE(outputItemType, "Failed to parse row type: " << SourceParams.GetRowType());
288289
YQL_ENSURE(outputItemType->IsStruct(), "Row type is not struct");
289290
const auto structType = static_cast<TStructType*>(outputItemType);
290291

ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp

Lines changed: 85 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
#include <ydb/tests/fq/pq_async_io/ut_helpers.h>
22

3+
#include <yql/essentials/minikql/mkql_string_util.h>
4+
#include <yql/essentials/providers/common/schema/mkql/yql_mkql_schema.h>
5+
#include <yql/essentials/public/issue/yql_issue_message.h>
36
#include <yql/essentials/utils/yql_panic.h>
47

5-
#include <library/cpp/testing/unittest/registar.h>
68
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
79
#include <ydb/library/actors/testlib/test_runtime.h>
810
#include <ydb/library/yql/dq/actors/common/retry_queue.h>
11+
#include <ydb/library/yql/dq/common/rope_over_buffer.h>
912
#include <library/cpp/testing/unittest/gtest.h>
13+
#include <library/cpp/testing/unittest/registar.h>
1014

1115
#include <thread>
1216

@@ -15,7 +19,6 @@ namespace NYql::NDq {
1519
const ui64 PartitionId = 666;
1620

1721
struct TFixture : public TPqIoTestFixture {
18-
1922
TFixture() {
2023
LocalRowDispatcherId = CaSetup->Runtime->AllocateEdgeActor();
2124
Coordinator1Id = CaSetup->Runtime->AllocateEdgeActor();
@@ -43,6 +46,7 @@ struct TFixture : public TPqIoTestFixture {
4346

4447
NYql::NPq::NProto::TDqPqTopicSource copySettings = settings;
4548
auto [dqSource, dqSourceAsActor] = CreateDqPqRdReadActor(
49+
actor.TypeEnv,
4650
std::move(copySettings),
4751
0,
4852
NYql::NDq::TCollectStatsLevel::None,
@@ -132,12 +136,36 @@ struct TFixture : public TPqIoTestFixture {
132136
});
133137
}
134138

135-
void MockMessageBatch(ui64 offset, const std::vector<TString>& jsons, NActors::TActorId rowDispatcherId, ui64 generation = 1) {
139+
TRope SerializeValue(TFakeActor& actor, const NUdf::TUnboxedValue& value, const TString& type) {
140+
NKikimr::NMiniKQL::TType* typeMkql = NYql::NCommon::ParseTypeFromYson(TStringBuf(type), actor.ProgramBuilder, Cerr);
141+
UNIT_ASSERT_C(typeMkql, "Invalid mkql type: " << type);
142+
143+
NKikimr::NMiniKQL::TValuePackerTransport<false> packer(typeMkql);
144+
return NYql::MakeReadOnlyRope(packer.Pack(value));
145+
}
146+
147+
TRope SerializeInt(TFakeActor& actor, ui64 item) {
148+
with_lock(actor.Alloc) {
149+
NUdf::TUnboxedValue value = NUdf::TUnboxedValuePod(item);
150+
return SerializeValue(actor, value, "[DataType; Uint64]");
151+
}
152+
}
153+
154+
TRope SerializeString(TFakeActor& actor, const TString& item) {
155+
with_lock(actor.Alloc) {
156+
NUdf::TUnboxedValue value = NKikimr::NMiniKQL::MakeString(item);
157+
return SerializeValue(actor, value, "[DataType; String]");
158+
}
159+
}
160+
161+
// Supported schema (Uint64, String)
162+
void MockMessageBatch(ui64 offset, const std::vector<std::pair<ui64, TString>>& messages, NActors::TActorId rowDispatcherId, ui64 generation = 1) {
136163
CaSetup->Execute([&](TFakeActor& actor) {
137164
auto event = new NFq::TEvRowDispatcher::TEvMessageBatch();
138-
for (const auto& json :jsons) {
165+
for (const auto& item : messages) {
139166
NFq::NRowDispatcherProto::TEvMessage message;
140-
message.SetJson(json);
167+
message.AddColumnsPayload(event->AddPayload(SerializeInt(actor, item.first)));
168+
message.AddColumnsPayload(event->AddPayload(SerializeString(actor, item.second)));
141169
message.SetOffset(offset++);
142170
*event->Record.AddMessages() = message;
143171
}
@@ -150,7 +178,8 @@ struct TFixture : public TPqIoTestFixture {
150178
void MockSessionError() {
151179
CaSetup->Execute([&](TFakeActor& actor) {
152180
auto event = new NFq::TEvRowDispatcher::TEvSessionError();
153-
event->Record.SetMessage("A problem has been detected and session has been shut down to prevent damage your life");
181+
event->Record.SetStatusCode(::NYql::NDqProto::StatusIds::BAD_REQUEST);
182+
IssueToMessage(TIssue("A problem has been detected and session has been shut down to prevent damage your life"), event->Record.AddIssues());
154183
event->Record.SetPartitionId(PartitionId);
155184
CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, RowDispatcher1, event, 0, 1));
156185
});
@@ -177,7 +206,7 @@ struct TFixture : public TPqIoTestFixture {
177206
watermarksBeforeIter == watermarkBeforePositions.end() ||
178207
*watermarksBeforeIter > expectedPos,
179208
"Watermark before item on position " << expectedPos << " was expected");
180-
UNIT_ASSERT_EQUAL(std::get<T>(item), expected.at(expectedPos));
209+
UNIT_ASSERT_VALUES_EQUAL(std::get<T>(item), expected.at(expectedPos));
181210
expectedPos++;
182211
}
183212
}
@@ -206,7 +235,7 @@ struct TFixture : public TPqIoTestFixture {
206235

207236
void StartSession(NYql::NPq::NProto::TDqPqTopicSource& settings, i64 freeSpace = 1_MB) {
208237
InitRdSource(settings, freeSpace);
209-
SourceRead<TString>(UVParser);
238+
SourceRead<std::pair<ui64, TString>>(UVPairParser);
210239
ExpectCoordinatorChangesSubscribe();
211240

212241
MockCoordinatorChanged(Coordinator1Id);
@@ -217,21 +246,21 @@ struct TFixture : public TPqIoTestFixture {
217246
MockAck(RowDispatcher1);
218247
}
219248

220-
void ProcessSomeJsons(ui64 offset, const std::vector<TString>& jsons, NActors::TActorId rowDispatcherId,
221-
std::function<std::vector<TString>(const NUdf::TUnboxedValue&)> uvParser = UVParser, ui64 generation = 1) {
249+
void ProcessSomeMessages(ui64 offset, const std::vector<std::pair<ui64, TString>>& messages, NActors::TActorId rowDispatcherId,
250+
std::function<std::vector<std::pair<ui64, TString>>(const NUdf::TUnboxedValue&)> uvParser = UVPairParser, ui64 generation = 1) {
222251
MockNewDataArrived(rowDispatcherId, generation);
223252
ExpectGetNextBatch(rowDispatcherId);
224253

225-
MockMessageBatch(offset, jsons, rowDispatcherId, generation);
254+
MockMessageBatch(offset, messages, rowDispatcherId, generation);
226255

227-
auto result = SourceReadDataUntil<TString>(uvParser, jsons.size());
228-
AssertDataWithWatermarks(result, jsons, {});
256+
auto result = SourceReadDataUntil<std::pair<ui64, TString>>(uvParser, messages.size());
257+
AssertDataWithWatermarks(result, messages, {});
229258
}
230259

231-
const TString Json1 = "{\"dt\":100,\"value\":\"value1\"}";
232-
const TString Json2 = "{\"dt\":200,\"value\":\"value2\"}";
233-
const TString Json3 = "{\"dt\":300,\"value\":\"value3\"}";
234-
const TString Json4 = "{\"dt\":400,\"value\":\"value4\"}";
260+
const std::pair<ui64, TString> Message1 = {100, "value1"};
261+
const std::pair<ui64, TString> Message2 = {200, "value2"};
262+
const std::pair<ui64, TString> Message3 = {300, "value3"};
263+
const std::pair<ui64, TString> Message4 = {400, "value4"};
235264

236265
NYql::NPq::NProto::TDqPqTopicSource Source1 = BuildPqTopicSourceSettings("topicName");
237266

@@ -245,7 +274,7 @@ struct TFixture : public TPqIoTestFixture {
245274
Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
246275
Y_UNIT_TEST_F(TestReadFromTopic, TFixture) {
247276
StartSession(Source1);
248-
ProcessSomeJsons(0, {Json1, Json2}, RowDispatcher1);
277+
ProcessSomeMessages(0, {Message1, Message2}, RowDispatcher1);
249278
}
250279

251280
Y_UNIT_TEST_F(SessionError, TFixture) {
@@ -257,7 +286,7 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
257286

258287
bool failured = false;
259288
while (Now() < deadline) {
260-
SourceRead<TString>(UVParser);
289+
SourceRead<std::pair<ui64, TString>>(UVPairParser);
261290
if (future.HasValue()) {
262291
UNIT_ASSERT_STRING_CONTAINS(future.GetValue().ToOneLineString(), "damage your life");
263292
failured = true;
@@ -273,17 +302,18 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
273302
MockNewDataArrived(RowDispatcher1);
274303
ExpectGetNextBatch(RowDispatcher1);
275304

276-
const std::vector<TString> data1 = {Json1, Json2};
305+
const std::vector<std::pair<ui64, TString>> data1 = {Message1, Message2};
277306
MockMessageBatch(0, data1, RowDispatcher1);
278307

279-
const std::vector<TString> data2 = {Json3, Json4};
308+
const std::vector<std::pair<ui64, TString>> data2 = {Message3, Message4};
280309
MockMessageBatch(2, data2, RowDispatcher1);
281310

282-
auto result = SourceReadDataUntil<TString>(UVParser, 1, 1);
283-
std::vector<TString> expected{data1};
311+
auto result = SourceReadDataUntil<std::pair<ui64, TString>>(UVPairParser, 1, 1);
312+
std::vector<std::pair<ui64, TString>> expected{data1};
284313
AssertDataWithWatermarks(result, expected, {});
285314

286-
UNIT_ASSERT_EQUAL(SourceRead<TString>(UVParser, 0).size(), 0);
315+
auto readSize = SourceRead<std::pair<ui64, TString>>(UVPairParser, 0).size();
316+
UNIT_ASSERT_VALUES_EQUAL(readSize, 0);
287317
}
288318

289319
Y_UNIT_TEST(TestSaveLoadPqRdRead) {
@@ -292,17 +322,17 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
292322
{
293323
TFixture f;
294324
f.StartSession(f.Source1);
295-
f.ProcessSomeJsons(0, {f.Json1, f.Json2}, f.RowDispatcher1); // offsets: 0, 1
325+
f.ProcessSomeMessages(0, {f.Message1, f.Message2}, f.RowDispatcher1); // offsets: 0, 1
296326

297327
f.SaveSourceState(CreateCheckpoint(), state);
298328
Cerr << "State saved" << Endl;
299329
}
300330
{
301331
TFixture f;
302332
f.InitRdSource(f.Source1);
303-
f.SourceRead<TString>(UVParser);
333+
f.SourceRead<std::pair<ui64, TString>>(UVPairParser);
304334
f.LoadSource(state);
305-
f.SourceRead<TString>(UVParser);
335+
f.SourceRead<std::pair<ui64, TString>>(UVPairParser);
306336
f.ExpectCoordinatorChangesSubscribe();
307337

308338
f.MockCoordinatorChanged(f.Coordinator1Id);
@@ -312,17 +342,17 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
312342
f.ExpectStartSession(2, f.RowDispatcher1);
313343
f.MockAck(f.RowDispatcher1);
314344

315-
f.ProcessSomeJsons(2, {f.Json3}, f.RowDispatcher1); // offsets: 2
345+
f.ProcessSomeMessages(2, {f.Message3}, f.RowDispatcher1); // offsets: 2
316346
state.Data.clear();
317347
f.SaveSourceState(CreateCheckpoint(), state);
318348
Cerr << "State saved" << Endl;
319349
}
320350
{
321351
TFixture f;
322352
f.InitRdSource(f.Source1);
323-
f.SourceRead<TString>(UVParser);
353+
f.SourceRead<std::pair<ui64, TString>>(UVPairParser);
324354
f.LoadSource(state);
325-
f.SourceRead<TString>(UVParser);
355+
f.SourceRead<std::pair<ui64, TString>>(UVPairParser);
326356
f.ExpectCoordinatorChangesSubscribe();
327357

328358
f.MockCoordinatorChanged(f.Coordinator1Id);
@@ -332,29 +362,29 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
332362
f.ExpectStartSession(3, f.RowDispatcher1);
333363
f.MockAck(f.RowDispatcher1);
334364

335-
f.ProcessSomeJsons(3, {f.Json4}, f.RowDispatcher1); // offsets: 3
365+
f.ProcessSomeMessages(3, {f.Message4}, f.RowDispatcher1); // offsets: 3
336366
}
337367
}
338368

339369
Y_UNIT_TEST_F(CoordinatorChanged, TFixture) {
340370
StartSession(Source1);
341-
ProcessSomeJsons(0, {Json1, Json2}, RowDispatcher1);
342-
MockMessageBatch(2, {Json3}, RowDispatcher1);
371+
ProcessSomeMessages(0, {Message1, Message2}, RowDispatcher1);
372+
MockMessageBatch(2, {Message3}, RowDispatcher1);
343373

344374
// change active Coordinator
345375
MockCoordinatorChanged(Coordinator2Id);
346376
ExpectStopSession(RowDispatcher1);
347377

348-
auto result = SourceReadDataUntil<TString>(UVParser, 1);
349-
AssertDataWithWatermarks(result, {Json3}, {});
378+
auto result = SourceReadDataUntil<std::pair<ui64, TString>>(UVPairParser, 1);
379+
AssertDataWithWatermarks(result, {Message3}, {});
350380

351381
auto req = ExpectCoordinatorRequest(Coordinator2Id);
352382
MockCoordinatorResult(RowDispatcher2, req->Cookie);
353383

354384
ExpectStartSession(3, RowDispatcher2, 2);
355385
MockAck(RowDispatcher2, 2);
356386

357-
ProcessSomeJsons(3, {Json4}, RowDispatcher2, UVParser, 2);
387+
ProcessSomeMessages(3, {Message4}, RowDispatcher2, UVPairParser, 2);
358388

359389
MockHeartbeat(RowDispatcher1, 1); // old generation
360390
ExpectStopSession(RowDispatcher1);
@@ -363,30 +393,30 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
363393
Y_UNIT_TEST_F(Backpressure, TFixture) {
364394
StartSession(Source1, 2_KB);
365395

366-
TString json(900, 'c');
367-
ProcessSomeJsons(0, {json}, RowDispatcher1);
396+
std::pair<ui64, TString> message = {100500, TString(900, 'c')};
397+
ProcessSomeMessages(0, {message}, RowDispatcher1);
368398

369399
MockNewDataArrived(RowDispatcher1);
370400
ExpectGetNextBatch(RowDispatcher1);
371-
MockMessageBatch(0, {json, json, json}, RowDispatcher1);
401+
MockMessageBatch(0, {message, message, message}, RowDispatcher1);
372402

373403
MockNewDataArrived(RowDispatcher1);
374404
ASSERT_THROW(
375405
CaSetup->Runtime->GrabEdgeEvent<NFq::TEvRowDispatcher::TEvGetNextBatch>(RowDispatcher1, TDuration::Seconds(0)),
376406
NActors::TEmptyEventQueueException);
377407

378-
auto result = SourceReadDataUntil<TString>(UVParser, 3);
379-
AssertDataWithWatermarks(result, {json, json, json}, {});
408+
auto result = SourceReadDataUntil<std::pair<ui64, TString>>(UVPairParser, 3);
409+
AssertDataWithWatermarks(result, {message, message, message}, {});
380410
ExpectGetNextBatch(RowDispatcher1);
381411

382-
MockMessageBatch(3, {Json1}, RowDispatcher1);
383-
result = SourceReadDataUntil<TString>(UVParser, 1);
384-
AssertDataWithWatermarks(result, {Json1}, {});
412+
MockMessageBatch(3, {Message1}, RowDispatcher1);
413+
result = SourceReadDataUntil<std::pair<ui64, TString>>(UVPairParser, 1);
414+
AssertDataWithWatermarks(result, {Message1}, {});
385415
}
386416

387417
Y_UNIT_TEST_F(RowDispatcherIsRestarted, TFixture) {
388418
StartSession(Source1);
389-
ProcessSomeJsons(0, {Json1, Json2}, RowDispatcher1);
419+
ProcessSomeMessages(0, {Message1, Message2}, RowDispatcher1);
390420
MockDisconnected();
391421
MockConnected();
392422
MockUndelivered();
@@ -396,7 +426,7 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
396426
ExpectStartSession(2, RowDispatcher1, 2);
397427
MockAck(RowDispatcher1, 2);
398428

399-
ProcessSomeJsons(2, {Json3}, RowDispatcher1, UVParser, 2);
429+
ProcessSomeMessages(2, {Message3}, RowDispatcher1, UVPairParser, 2);
400430
}
401431

402432
Y_UNIT_TEST_F(IgnoreMessageIfNoSessions, TFixture) {
@@ -406,15 +436,22 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
406436
}
407437

408438
Y_UNIT_TEST_F(MetadataFields, TFixture) {
439+
auto metadataUVParser = [](const NUdf::TUnboxedValue& item) -> std::vector<std::pair<ui64, TString>> {
440+
UNIT_ASSERT_VALUES_EQUAL(item.GetListLength(), 3);
441+
auto stringElement = item.GetElement(2);
442+
return { {item.GetElement(1).Get<ui64>(), TString(stringElement.AsStringRef())} };
443+
};
444+
409445
auto source = BuildPqTopicSourceSettings("topicName");
410446
source.AddMetadataFields("_yql_sys_create_time");
447+
source.SetRowType("[StructType; [[_yql_sys_create_time; [DataType; Uint32]]; [dt; [DataType; Uint64]]; [value; [DataType; String]]]]");
411448
StartSession(source);
412-
ProcessSomeJsons(0, {Json1}, RowDispatcher1, UVParserWithMetadatafields);
449+
ProcessSomeMessages(0, {Message1}, RowDispatcher1, metadataUVParser);
413450
}
414451

415452
Y_UNIT_TEST_F(IgnoreCoordinatorResultIfWrongState, TFixture) {
416453
StartSession(Source1);
417-
ProcessSomeJsons(0, {Json1, Json2}, RowDispatcher1);
454+
ProcessSomeMessages(0, {Message1, Message2}, RowDispatcher1);
418455

419456
MockCoordinatorChanged(Coordinator2Id);
420457
auto req = ExpectCoordinatorRequest(Coordinator2Id);

0 commit comments

Comments
 (0)