Skip to content

Commit 065bbdf

Browse files
committed
Fixed unit tests
1 parent 8a696cb commit 065bbdf

File tree

12 files changed

+193
-132
lines changed

12 files changed

+193
-132
lines changed

ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
#include <util/string/builder.h>
44

5-
#include <ydb/library/yql/dq/actors/dq.h>
6-
75
#include <yql/essentials/public/purecalc/common/interface.h>
86

97
namespace NFq::NRowDispatcher {
@@ -71,7 +69,7 @@ const NYql::TIssues& TStatus::GetIssues() const {
7169
}
7270

7371
TString TStatus::ToString() const {
74-
return TStringBuilder() << "Status: " << NYql::NDq::DqStatusToYdbStatus(Status) << ", Issues: " << Issues.ToOneLineString();
72+
return TStringBuilder() << "Status: " << NYql::NDqProto::StatusIds_StatusCode_Name(Status) << ", Issues: " << Issues.ToOneLineString();
7573
}
7674

7775
TStatus& TStatus::AddParentIssue(NYql::TIssue issue) {

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ class TTopicFormatHandler : public ITopicFormatHandler {
305305
}
306306

307307
void FilterData(ui64 numberRows) {
308-
LOG_ROW_DISPATCHER_TRACE("Send " << numberRows << " to filters");
308+
LOG_ROW_DISPATCHER_TRACE("Send " << numberRows << " messages to filters");
309309
if (!numberRows) {
310310
return;
311311
}

ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44

55
#include <ydb/core/testlib/basics/appdata.h>
66

7-
#include <ydb/library/yql/dq/actors/dq.h>
7+
#include <ydb/library/yql/dq/common/rope_over_buffer.h>
88

9+
#include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
910
#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
11+
#include <yql/essentials/providers/common/schema/mkql/yql_mkql_schema.h>
1012

1113
namespace NFq::NRowDispatcher::NTests {
1214

@@ -42,6 +44,7 @@ void TBaseFixture::SetUp(NUnitTest::TTestContext&) {
4244
TAutoPtr<NKikimr::TAppPrepare> app = new NKikimr::TAppPrepare();
4345
Runtime.SetLogBackend(NActors::CreateStderrBackend());
4446
Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NActors::NLog::PRI_TRACE);
47+
Runtime.SetDispatchTimeout(TDuration::Seconds(5));
4548
Runtime.Initialize(app->Unwrap());
4649
}
4750

@@ -59,15 +62,37 @@ NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage TBaseFixture::GetM
5962
return NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage(data, nullptr, info, nullptr);
6063
}
6164

65+
NYql::NUdf::TUnboxedValue TBaseFixture::ParseValue(const TRope& serializedColumn, const TString& type) const {
66+
NKikimr::NMiniKQL::TType* typeMkql = NYql::NCommon::ParseTypeFromYson(TStringBuf(type), *ProgramBuilder, Cerr);
67+
UNIT_ASSERT_C(typeMkql, "Invalid mkql type: " << type);
68+
69+
NKikimr::NMiniKQL::TValuePackerTransport<false> unpacker(typeMkql);
70+
return unpacker.Unpack(NYql::MakeChunkedBuffer(serializedColumn), *HolderFactory);
71+
}
72+
73+
void TBaseFixture::CheckStringColumn(const TRope& serializedColumn, const TString& expectedData) const {
74+
with_lock(Alloc) {
75+
NYql::NUdf::TUnboxedValue value = ParseValue(serializedColumn, "[DataType; String]");
76+
UNIT_ASSERT_VALUES_EQUAL(expectedData, TString(value.AsStringRef()));
77+
}
78+
}
79+
80+
void TBaseFixture::CheckIntColumn(const TRope& serializedColumn, ui64 expectedData) const {
81+
with_lock(Alloc) {
82+
NYql::NUdf::TUnboxedValue value = ParseValue(serializedColumn, "[DataType; Uint64]");
83+
UNIT_ASSERT_VALUES_EQUAL(expectedData, value.Get<ui64>());
84+
}
85+
}
86+
6287
//// Functions
6388

6489
void CheckSuccess(const TStatus& status) {
6590
UNIT_ASSERT_C(status.IsSuccess(), TStringBuilder() << "Status is not success, " << status.ToString());
6691
}
6792

6893
void CheckError(const TStatus& status, TStatus::TStatucCode expectedStatusCode, const TString& expectedMessage) {
69-
UNIT_ASSERT_C(status.GetStatus() == expectedStatusCode, TStringBuilder() << "Expected error status " << NYql::NDq::DqStatusToYdbStatus(expectedStatusCode) << ", but got: " << status.ToString());
70-
UNIT_ASSERT_STRING_CONTAINS_C(status.GetIssues().ToOneLineString(), expectedMessage, TStringBuilder() << "Unexpected error message, Status: " << NYql::NDq::DqStatusToYdbStatus(status.GetStatus()));
94+
UNIT_ASSERT_C(status.GetStatus() == expectedStatusCode, TStringBuilder() << "Expected error status " << NYql::NDqProto::StatusIds_StatusCode_Name(expectedStatusCode) << ", but got: " << status.ToString());
95+
UNIT_ASSERT_STRING_CONTAINS_C(status.GetIssues().ToOneLineString(), expectedMessage, TStringBuilder() << "Unexpected error message, Status: " << NYql::NDqProto::StatusIds_StatusCode_Name(status.GetStatus()));
7196
}
7297

7398
} // namespace NFq::NRowDispatcher::NTests

ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ class TBaseFixture : public NUnitTest::TBaseFixture {
2525
public:
2626
static NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage GetMessage(ui64 offset, const TString& data);
2727

28+
NYql::NUdf::TUnboxedValue ParseValue(const TRope& serializedColumn, const TString& type) const;
29+
void CheckStringColumn(const TRope& serializedColumn, const TString& expectedData) const;
30+
void CheckIntColumn(const TRope& serializedColumn, ui64 expectedData) const;
31+
2832
public:
2933
NKikimr::NMiniKQL::TMemoryUsageInfo MemoryInfo;
3034
NKikimr::NMiniKQL::TScopedAlloc Alloc;

ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ya.make

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@ PEERDIR(
1616
ydb/core/testlib/basics
1717

1818
ydb/library/yql/dq/actors
19+
ydb/library/yql/dq/common
1920

2021
yql/essentials/minikql
2122
yql/essentials/minikql/invoke_builtins
2223
yql/essentials/minikql/computation
24+
yql/essentials/providers/common/schema/mkql
2325
)
2426

2527
YQL_LAST_ABI_VERSION()

ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,6 @@
22

33
#include <ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h>
44

5-
#include <ydb/library/yql/dq/common/rope_over_buffer.h>
6-
7-
#include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
8-
#include <yql/essentials/providers/common/schema/mkql/yql_mkql_schema.h>
9-
105
namespace NFq::NRowDispatcher::NTests {
116

127
namespace {
@@ -25,7 +20,6 @@ class TFormatHadlerFixture : public TBaseFixture {
2520
: ClientId(clientId)
2621
, Columns(columns)
2722
, WhereFilter(whereFilter)
28-
, FactorySettings{.EnabledLLVM = false}
2923
, Callback(callback)
3024
, ExpectedFilteredRows(expectedFilteredRows)
3125
{}
@@ -55,16 +49,16 @@ class TFormatHadlerFixture : public TBaseFixture {
5549
}
5650

5751
public:
58-
const TVector<TSchemaColumn>& GetColumns() const override {
52+
TVector<TSchemaColumn> GetColumns() const override {
5953
return Columns;
6054
}
6155

6256
const TString& GetWhereFilter() const override {
6357
return WhereFilter;
6458
}
6559

66-
const IPureCalcProgramFactory::TSettings& GetPureCalcSettings() const override {
67-
return FactorySettings;
60+
IPureCalcProgramFactory::TSettings GetPureCalcSettings() const override {
61+
return {.EnabledLLVM = false};
6862
}
6963

7064
virtual NActors::TActorId GetClientId() const override {
@@ -101,7 +95,6 @@ class TFormatHadlerFixture : public TBaseFixture {
10195
const NActors::TActorId ClientId;
10296
const TVector<TSchemaColumn> Columns;
10397
const TString WhereFilter;
104-
const IPureCalcProgramFactory::TSettings FactorySettings;
10598
const TCallback Callback;
10699

107100
bool Frozen = false;
@@ -169,16 +162,6 @@ class TFormatHadlerFixture : public TBaseFixture {
169162
FormatHandler->RemoveClient(clientId);
170163
}
171164

172-
void CheckStringColumn(std::shared_ptr<TRope> serializedColumn, const TString& expectedData) {
173-
with_lock(Alloc) {
174-
NKikimr::NMiniKQL::TType* typeMkql = NYql::NCommon::ParseTypeFromYson(TStringBuf("[DataType; String]"), *ProgramBuilder, Cerr);
175-
NKikimr::NMiniKQL::TValuePackerTransport<false> unpacker(typeMkql);
176-
NYql::NUdf::TUnboxedValue value = unpacker.Unpack(NYql::MakeChunkedBuffer(*serializedColumn), *HolderFactory);
177-
178-
UNIT_ASSERT_VALUES_EQUAL(expectedData, TString(value.AsStringRef()));
179-
}
180-
}
181-
182165
public:
183166
TVector<NActors::TActorId> ClientIds;
184167
TVector<TClientDataConsumer::TPtr> Clients;
@@ -199,8 +182,8 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
199182
[&](ui64 offset, TVector<std::shared_ptr<TRope>>&& data) {
200183
UNIT_ASSERT_VALUES_EQUAL(offset, firstOffset + 1);
201184
UNIT_ASSERT_VALUES_EQUAL(data.size(), 2);
202-
CheckStringColumn(data[0], "event2");
203-
CheckStringColumn(data[1], "str_first__large__");
185+
CheckStringColumn(*data[0], "event2");
186+
CheckStringColumn(*data[1], "str_first__large__");
204187
}
205188
));
206189

@@ -210,8 +193,8 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
210193
[&](ui64 offset, TVector<std::shared_ptr<TRope>>&& data) {
211194
UNIT_ASSERT_VALUES_EQUAL(offset, firstOffset);
212195
UNIT_ASSERT_VALUES_EQUAL(data.size(), 2);
213-
CheckStringColumn(data[0], "event1");
214-
CheckStringColumn(data[1], "str_second");
196+
CheckStringColumn(*data[0], "event1");
197+
CheckStringColumn(*data[1], "str_second");
215198
}
216199
));
217200

@@ -243,7 +226,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
243226
auto trueChacker = [&](ui64 offset, TVector<std::shared_ptr<TRope>>&& data) {
244227
UNIT_ASSERT(offset - firstOffset < testData.size());
245228
UNIT_ASSERT_VALUES_EQUAL(data.size(), 1);
246-
CheckStringColumn(data[0], testData[offset - firstOffset]);
229+
CheckStringColumn(*data[0], testData[offset - firstOffset]);
247230
};
248231
CheckSuccess(MakeClient(schema, "WHERE TRUE", trueChacker, 3));
249232
CheckSuccess(MakeClient(schema, "", trueChacker, 2));
@@ -272,8 +255,8 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
272255
[&](ui64 offset, TVector<std::shared_ptr<TRope>>&& data) {
273256
UNIT_ASSERT_VALUES_EQUAL(offset, firstOffset);
274257
UNIT_ASSERT_VALUES_EQUAL(data.size(), 2);
275-
CheckStringColumn(data[0], "event2");
276-
CheckStringColumn(data[1], "str_first__large__");
258+
CheckStringColumn(*data[0], "event2");
259+
CheckStringColumn(*data[1], "str_first__large__");
277260
parsed = true;
278261
}
279262
));
@@ -336,8 +319,8 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
336319
[&](ui64 offset, TVector<std::shared_ptr<TRope>>&& data) {
337320
UNIT_ASSERT_VALUES_EQUAL(offset, firstOffset);
338321
UNIT_ASSERT_VALUES_EQUAL(data.size(), 2);
339-
CheckStringColumn(data[0], "event1");
340-
CheckStringColumn(data[1], "str_second");
322+
CheckStringColumn(*data[0], "event1");
323+
CheckStringColumn(*data[1], "str_second");
341324
}
342325
));
343326

ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ class TFiterFixture : public TBaseFixture {
2121
TFilterConsumer(const TVector<TSchemaColumn>& columns, const TString& whereFilter, TCallback callback)
2222
: Columns(columns)
2323
, WhereFilter(whereFilter)
24-
, FactorySettings{.EnabledLLVM = false}
2524
, Callback(callback)
2625
{}
2726

@@ -34,8 +33,8 @@ class TFiterFixture : public TBaseFixture {
3433
return WhereFilter;
3534
}
3635

37-
const IPureCalcProgramFactory::TSettings& GetPureCalcSettings() const override {
38-
return FactorySettings;
36+
IPureCalcProgramFactory::TSettings GetPureCalcSettings() const override {
37+
return {.EnabledLLVM = false};
3938
}
4039

4140
virtual NActors::TActorId GetFilterId() const override {
@@ -57,7 +56,6 @@ class TFiterFixture : public TBaseFixture {
5756
private:
5857
const TVector<TSchemaColumn> Columns;
5958
const TString WhereFilter;
60-
const IPureCalcProgramFactory::TSettings FactorySettings;
6159
const TCallback Callback;
6260
};
6361

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
#include "topic_session.h"
22

3+
#include "common.h"
34
#include "format_handler/format_handler.h"
45

56
#include <ydb/core/fq/libs/actors/logging/log.h>
67

78
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
9+
#include <ydb/library/yql/dq/actors/dq.h>
810
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
911
#include <ydb/library/actors/core/actor_bootstrapped.h>
1012
#include <ydb/library/actors/core/hfunc.h>
@@ -97,8 +99,9 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
9799
struct TClientsInfo : public IClientDataConsumer {
98100
using TPtr = TIntrusivePtr<TClientsInfo>;
99101

100-
TClientsInfo(TTopicSession& self, const ITopicFormatHandler::TSettings& handlerSettings, const NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev, NMonitoring::TDynamicCounterPtr& counters)
102+
TClientsInfo(TTopicSession& self, const TString& logPrefix, const ITopicFormatHandler::TSettings& handlerSettings, const NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev, NMonitoring::TDynamicCounterPtr& counters)
101103
: Self(self)
104+
, LogPrefix(logPrefix)
102105
, HandlerSettings(handlerSettings)
103106
, Settings(ev->Get()->Record)
104107
, ReadActorId(ev->Sender)
@@ -150,6 +153,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
150153
for (const auto& columnData : data) {
151154
dataSize += columnData->GetSize();
152155
}
156+
LOG_ROW_DISPATCHER_TRACE("AddDataToClient to " << ReadActorId << ", offset: " << offset << ", schema size: " << data.size() << ", serialized size: " << dataSize);
153157

154158
NextMessageOffset = offset + 1;
155159
Buffer.emplace(offset, std::move(data));
@@ -159,10 +163,12 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
159163
}
160164

161165
void UpdateClinetOffset(ui64 offset) override {
166+
LOG_ROW_DISPATCHER_TRACE("UpdateClinetOffset for " << ReadActorId << ", new offset: " << offset);
162167
NextMessageOffset = offset + 1;
163168
}
164169

165170
TTopicSession& Self;
171+
const TString& LogPrefix;
166172
ITopicFormatHandler::TSettings HandlerSettings;
167173
NFq::NRowDispatcherProto::TEvStartSession Settings;
168174
NActors::TActorId ReadActorId;
@@ -554,9 +560,10 @@ void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TReadSessionE
554560
}
555561

556562
void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TSessionClosedEvent& ev) {
557-
TString message = TStringBuilder() << "Read session to topic \"" << Self.TopicPathPartition << "\" was closed: " << ev.DebugString();
558-
LOG_ROW_DISPATCHER_DEBUG(message);
559-
Self.FatalError(TStatus(TStatus::EId::UNAVAILABLE, message));
563+
TString message = TStringBuilder() << "Read session to topic \"" << Self.TopicPath << "\" was closed";
564+
LOG_ROW_DISPATCHER_DEBUG(message << ": " << ev.DebugString());
565+
const auto statusCode = NYql::NDq::YdbStatusToDqStatus(static_cast<Ydb::StatusIds::StatusCode>(ev.GetStatus()));
566+
Self.FatalError(TStatus(statusCode, ev.GetIssues()).AddParentIssue(message));
560567
}
561568

562569
void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent& event) {
@@ -674,14 +681,11 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
674681

675682
auto queryGroup = Counters->GetSubgroup("queryId", ev->Get()->Record.GetQueryId());
676683
auto topicGroup = queryGroup->GetSubgroup("topic", CleanupCounterValueString(TopicPath));
677-
auto& clientInfo = Clients.emplace(
678-
std::piecewise_construct,
679-
std::forward_as_tuple(ev->Sender),
680-
std::forward_as_tuple(*this, handlerSettings, ev, topicGroup)).first->second;
684+
auto clientInfo = Clients.insert({ev->Sender, MakeIntrusive<TClientsInfo>(*this, LogPrefix, handlerSettings, ev, topicGroup)}).first->second;
681685

682686
auto formatIt = FormatHandlers.find(handlerSettings);
683687
if (formatIt == FormatHandlers.end()) {
684-
FormatHandlers.insert({handlerSettings, CreateTopicFormatHandler(FormatHandlerConfig, handlerSettings)});
688+
formatIt = FormatHandlers.insert({handlerSettings, CreateTopicFormatHandler(FormatHandlerConfig, handlerSettings)}).first;
685689
}
686690

687691
if (auto status = formatIt->second->AddClient(clientInfo); !status.IsSuccess()) {
@@ -742,6 +746,7 @@ void TTopicSession::FatalError(TStatus status) {
742746
}
743747

744748
void TTopicSession::SendSessionError(NActors::TActorId readActorId, TStatus status) {
749+
LOG_ROW_DISPATCHER_WARN("SendSessionError to " << readActorId << ", status: " << status.ToString());
745750
auto event = std::make_unique<TEvRowDispatcher::TEvSessionError>();
746751
event->Record.SetStatusCode(status.GetStatus());
747752
NYql::IssuesToMessage(status.GetIssues(), event->Record.MutableIssues());

0 commit comments

Comments
 (0)