Skip to content

Commit 2073f27

Browse files
YQ: sink to stable (#11215)
Co-authored-by: Pisarenko Grigoriy <grigoriypisar@ydb.tech>
1 parent ffa30dd commit 2073f27

File tree

7 files changed

+82
-16
lines changed

7 files changed

+82
-16
lines changed

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
9797
private:
9898
using TParserInputType = TSet<std::pair<TString, TString>>;
9999

100-
struct ClientsInfo {
101-
ClientsInfo(const NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev)
100+
struct TClientsInfo {
101+
TClientsInfo(const NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev)
102102
: Settings(ev->Get()->Record)
103103
, ReadActorId(ev->Sender)
104104
{
@@ -151,7 +151,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
151151
ui64 LastMessageOffset = 0;
152152
bool IsWaitingEvents = false;
153153
bool IsStartParsingScheduled = false;
154-
THashMap<NActors::TActorId, ClientsInfo> Clients;
154+
THashMap<NActors::TActorId, TClientsInfo> Clients;
155155
THashSet<NActors::TActorId> ClientsWithoutPredicate;
156156
std::unique_ptr<TJsonParser> Parser;
157157
NConfig::TRowDispatcherConfig Config;
@@ -190,15 +190,15 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
190190
void SendToParsing(const TVector<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>& messages);
191191
void DoParsing(bool force = false);
192192
void DoFiltering(const TVector<ui64>& offsets, const TVector<NKikimr::NMiniKQL::TUnboxedValueVector>& parsedValues);
193-
void SendData(ClientsInfo& info);
193+
void SendData(TClientsInfo& info);
194194
void UpdateParser();
195195
void FatalError(const TString& message, const std::unique_ptr<TJsonFilter>* filter = nullptr);
196-
void SendDataArrived(ClientsInfo& client);
196+
void SendDataArrived(TClientsInfo& client);
197197
void StopReadSession();
198198
TString GetSessionId() const;
199199
void HandleNewEvents();
200200
TInstant GetMinStartingMessageTimestamp() const;
201-
void AddDataToClient(ClientsInfo& client, ui64 offset, const TString& json);
201+
void AddDataToClient(TClientsInfo& client, ui64 offset, const TString& json);
202202

203203
std::pair<NYql::NUdf::TUnboxedValuePod, i64> CreateItem(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message);
204204

@@ -215,9 +215,9 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
215215

216216
void SendStatistic();
217217
void SendSessionError(NActors::TActorId readActorId, const TString& message);
218-
TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*> RebuildJson(const ClientsInfo& info, const TVector<NKikimr::NMiniKQL::TUnboxedValueVector>& parsedValues);
218+
TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*> RebuildJson(const TClientsInfo& info, const TVector<NKikimr::NMiniKQL::TUnboxedValueVector>& parsedValues);
219219
void UpdateParserSchema(const TParserInputType& inputType);
220-
void UpdateFieldsIds(ClientsInfo& clientInfo);
220+
void UpdateFieldsIds(TClientsInfo& clientInfo);
221221

222222
private:
223223

@@ -387,7 +387,7 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvCreateSession::TPtr&) {
387387
CreateTopicSession();
388388
}
389389

390-
TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*> TTopicSession::RebuildJson(const ClientsInfo& info, const TVector<NKikimr::NMiniKQL::TUnboxedValueVector>& parsedValues) {
390+
TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*> TTopicSession::RebuildJson(const TClientsInfo& info, const TVector<NKikimr::NMiniKQL::TUnboxedValueVector>& parsedValues) {
391391
TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*> result;
392392
const auto& offsets = ParserSchema.FieldsMap;
393393
result.reserve(info.FieldsIds.size());
@@ -601,7 +601,7 @@ void TTopicSession::DoFiltering(const TVector<ui64>& offsets, const TVector<NKik
601601
Send(SelfId(), new TEvPrivate::TEvDataFiltered(offsets.back()));
602602
}
603603

604-
void TTopicSession::SendData(ClientsInfo& info) {
604+
void TTopicSession::SendData(TClientsInfo& info) {
605605
info.DataArrivedSent = false;
606606
if (info.Buffer.empty()) {
607607
LOG_ROW_DISPATCHER_TRACE("Buffer empty");
@@ -639,7 +639,7 @@ void TTopicSession::SendData(ClientsInfo& info) {
639639
info.LastSendedNextMessageOffset = *info.NextMessageOffset;
640640
}
641641

642-
void TTopicSession::UpdateFieldsIds(ClientsInfo& info) {
642+
void TTopicSession::UpdateFieldsIds(TClientsInfo& info) {
643643
for (auto name : info.Settings.GetSource().GetColumns()) {
644644
auto it = FieldsIndexes.find(name);
645645
if (it == FieldsIndexes.end()) {
@@ -652,6 +652,15 @@ void TTopicSession::UpdateFieldsIds(ClientsInfo& info) {
652652
}
653653
}
654654

655+
bool HasJsonColumns(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) {
656+
for (const auto& type : sourceParams.GetColumnTypes()) {
657+
if (type.Contains("Json")) {
658+
return true;
659+
}
660+
}
661+
return false;
662+
}
663+
655664
void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
656665
auto it = Clients.find(ev->Sender);
657666
if (it != Clients.end()) {
@@ -678,6 +687,12 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
678687
UpdateFieldsIds(clientInfo);
679688

680689
TString predicate = clientInfo.Settings.GetSource().GetPredicate();
690+
691+
// TODO: remove this when the re-parsing is removed from pq read actor
692+
if (predicate.empty() && HasJsonColumns(clientInfo.Settings.GetSource())) {
693+
predicate = "WHERE TRUE";
694+
}
695+
681696
if (!predicate.empty()) {
682697
clientInfo.Filter = NewJsonFilter(
683698
columns,
@@ -710,7 +725,7 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
710725
}
711726
}
712727

713-
void TTopicSession::AddDataToClient(ClientsInfo& info, ui64 offset, const TString& json) {
728+
void TTopicSession::AddDataToClient(TClientsInfo& info, ui64 offset, const TString& json) {
714729
if (info.NextMessageOffset && offset < info.NextMessageOffset) {
715730
return;
716731
}
@@ -836,7 +851,7 @@ void TTopicSession::StopReadSession() {
836851
TopicClient.Reset();
837852
}
838853

839-
void TTopicSession::SendDataArrived(ClientsInfo& info) {
854+
void TTopicSession::SendDataArrived(TClientsInfo& info) {
840855
if (info.Buffer.empty() || info.DataArrivedSent) {
841856
return;
842857
}

ydb/library/yql/providers/common/pushdown/collection.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ bool IsSupportedPredicate(const TCoCompare& predicate, const TSettings& settings
5656
return true;
5757
} else if (settings.IsEnabled(TSettings::EFeatureFlag::LikeOperator) && IsLikeOperator(predicate)) {
5858
return true;
59+
} else if (predicate.Maybe<TCoAggrEqual>()) {
60+
return true;
61+
} else if (predicate.Maybe<TCoAggrNotEqual>()) {
62+
return true;
5963
}
6064

6165
return false;

ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,8 @@ message TPredicate {
410410
NE = 4; // "$column != value"
411411
GE = 5; // "$column >= value"
412412
G = 6; // "$column > value"
413+
IND = 7;// "$column IS NOT DISTINCT value"
414+
ID = 8; // "$column IS DISTINCT value"
413415
}
414416

415417
EOperation operation = 1;

ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ namespace NYql {
112112
EXPR_NODE_TO_COMPARE_TYPE(TCoCmpLessOrEqual, LE);
113113
EXPR_NODE_TO_COMPARE_TYPE(TCoCmpGreater, G);
114114
EXPR_NODE_TO_COMPARE_TYPE(TCoCmpGreaterOrEqual, GE);
115+
EXPR_NODE_TO_COMPARE_TYPE(TCoAggrEqual, IND);
116+
EXPR_NODE_TO_COMPARE_TYPE(TCoAggrNotEqual, ID);
115117

116118
if (proto->operation() == TPredicate::TComparison::COMPARISON_OPERATION_UNSPECIFIED) {
117119
err << "unknown operation: " << compare.Raw()->Content();
@@ -383,6 +385,12 @@ namespace NYql {
383385
case TPredicate_TComparison::G:
384386
operation = " > ";
385387
break;
388+
case TPredicate_TComparison::IND:
389+
operation = " IS NOT DISTINCT FROM ";
390+
break;
391+
case TPredicate_TComparison::ID:
392+
operation = " IS DISTINCT FROM ";
393+
break;
386394
default:
387395
throw yexception() << "UnimplementedOperation, operation " << static_cast<ui64>(comparison.operation());
388396
}

ydb/library/yql/providers/generic/pushdown/yql_generic_match_predicate.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ namespace NYql::NGenericPushDown {
115115
return rightValueTimestamp < timestampStatistics.highValue ? Triple::True : Triple::False;
116116
case ::NYql::NConnector::NApi::TPredicate::TComparison::NE:
117117
return rightValueTimestamp < timestampStatistics.lowValue || timestampStatistics.highValue < rightValueTimestamp ? Triple::True : Triple::False;
118+
case ::NYql::NConnector::NApi::TPredicate::TComparison::IND:
119+
case ::NYql::NConnector::NApi::TPredicate::TComparison::ID:
118120
case ::NYql::NConnector::NApi::TPredicate::TComparison::COMPARISON_OPERATION_UNSPECIFIED:
119121
case ::NYql::NConnector::NApi::TPredicate_TComparison_EOperation_TPredicate_TComparison_EOperation_INT_MIN_SENTINEL_DO_NOT_USE_:
120122
case ::NYql::NConnector::NApi::TPredicate_TComparison_EOperation_TPredicate_TComparison_EOperation_INT_MAX_SENTINEL_DO_NOT_USE_:
@@ -147,6 +149,8 @@ namespace NYql::NGenericPushDown {
147149
return timestampStatistics.lowValue < leftValueTimestamp ? Triple::True : Triple::False;
148150
case ::NYql::NConnector::NApi::TPredicate::TComparison::NE:
149151
return leftValueTimestamp < timestampStatistics.lowValue || timestampStatistics.highValue < leftValueTimestamp ? Triple::True : Triple::False;
152+
case ::NYql::NConnector::NApi::TPredicate::TComparison::IND:
153+
case ::NYql::NConnector::NApi::TPredicate::TComparison::ID:
150154
case ::NYql::NConnector::NApi::TPredicate::TComparison::COMPARISON_OPERATION_UNSPECIFIED:
151155
case ::NYql::NConnector::NApi::TPredicate_TComparison_EOperation_TPredicate_TComparison_EOperation_INT_MIN_SENTINEL_DO_NOT_USE_:
152156
case ::NYql::NConnector::NApi::TPredicate_TComparison_EOperation_TPredicate_TComparison_EOperation_INT_MAX_SENTINEL_DO_NOT_USE_:

ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ namespace {
2727
: NPushdown::TSettings(NLog::EComponent::ProviderGeneric)
2828
{
2929
using EFlag = NPushdown::TSettings::EFeatureFlag;
30-
Enable(EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 | EFlag::StringTypes | EFlag::LikeOperator);
30+
Enable(EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 | EFlag::StringTypes | EFlag::LikeOperator | EFlag::DoNotCheckCompareArgumentsTypes);
3131
}
3232
};
3333

ydb/tests/fq/yds/test_row_dispatcher.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,36 @@ def test_nested_types(self, kikimr, client):
253253
issues = str(client.describe_query(query_id).result.query.transient_issue)
254254
assert "Row dispatcher will use the predicate:" in issues, "Incorrect Issues: " + issues
255255

256+
@yq_v1
257+
def test_nested_types_without_predicate(self, kikimr, client):
258+
client.create_yds_connection(
259+
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
260+
)
261+
self.init_topics("test_nested_types_without_predicate")
262+
263+
sql = Rf'''
264+
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
265+
SELECT data FROM {YDS_CONNECTION}.`{self.input_topic}`
266+
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data Json NOT NULL, event String NOT NULL));'''
267+
268+
query_id = start_yds_query(kikimr, client, sql)
269+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
270+
271+
data = [
272+
'{"time": 101, "data": {"key": "value"}, "event": "event1"}',
273+
'{"time": 102, "data": ["key1", "key2"], "event": "event2"}'
274+
]
275+
276+
self.write_stream(data)
277+
expected = [
278+
'{"key": "value"}',
279+
'["key1", "key2"]'
280+
]
281+
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
282+
283+
wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
284+
stop_yds_query(client, query_id)
285+
256286
@yq_v1
257287
def test_filter(self, kikimr, client):
258288
client.create_yds_connection(
@@ -264,7 +294,10 @@ def test_filter(self, kikimr, client):
264294
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
265295
SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}`
266296
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data String NOT NULL, event String NOT NULL))
267-
WHERE time > 101UL or event = "event666";'''
297+
WHERE time > 101 and
298+
data = "hello2" and
299+
event IS NOT DISTINCT FROM "event2" and
300+
event IS DISTINCT FROM "event1";'''
268301

269302
query_id = start_yds_query(kikimr, client, sql)
270303
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
@@ -544,7 +577,7 @@ def test_stop_start_with_filter(self, kikimr, client):
544577
INSERT INTO {YDS_CONNECTION}.`{output_topic}`
545578
SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}`
546579
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL))
547-
WHERE time > 200UL;'''
580+
WHERE time > 200;'''
548581

549582
query_id = start_yds_query(kikimr, client, sql)
550583
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)

0 commit comments

Comments
 (0)