Skip to content

Commit 8a05ebd

Browse files
authored
YQ-3594 processing queries with different schemes (#9981)
1 parent 4e677a5 commit 8a05ebd

File tree

3 files changed

+166
-50
lines changed

3 files changed

+166
-50
lines changed

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 107 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
1111
#include <ydb/library/yql/dq/runtime/dq_async_stats.h>
1212
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
13+
14+
#include <util/string/join.h>
1315
#include <util/generic/queue.h>
1416

1517
#include <ydb/core/fq/libs/row_dispatcher/json_parser.h>
@@ -62,13 +64,11 @@ struct TEvPrivate {
6264
struct TEvCreateSession : public NActors::TEventLocal<TEvCreateSession, EvCreateSession> {};
6365
struct TEvPrintState : public NActors::TEventLocal<TEvPrintState, EvPrintState> {};
6466
struct TEvStatus : public NActors::TEventLocal<TEvStatus, EvStatus> {};
65-
6667
struct TEvDataFiltered : public NActors::TEventLocal<TEvDataFiltered, EvDataFiltered> {
6768
TEvDataFiltered(ui64 offset, ui64 numberValues)
6869
: Offset(offset)
6970
, NumberValues(numberValues)
7071
{}
71-
7272
const ui64 Offset;
7373
const ui64 NumberValues;
7474
};
@@ -95,7 +95,7 @@ TVector<TString> GetVector(const google::protobuf::RepeatedPtrField<TString>& va
9595
class TTopicSession : public TActorBootstrapped<TTopicSession> {
9696

9797
private:
98-
using TParserInputType = std::pair< TVector<TString>, TVector<TString>>; // TODO: remove after YQ-3594
98+
using TParserInputType = TSet<std::pair<TString, TString>>;
9999

100100
struct ClientsInfo {
101101
ClientsInfo(const NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev)
@@ -114,6 +114,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
114114
bool DataArrivedSent = false;
115115
TMaybe<ui64> NextMessageOffset;
116116
ui64 LastSendedNextMessageOffset = 0;
117+
TVector<ui64> FieldsIds;
117118
};
118119

119120
struct TTopicEventProcessor {
@@ -130,6 +131,11 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
130131
const TString& LogPrefix;
131132
};
132133

134+
struct TParserSchema {
135+
TVector<ui64> FieldsMap; // index - FieldId (from FieldsIndexes), value - parsing schema offset
136+
TParserInputType InputType;
137+
};
138+
133139
const TString TopicPath;
134140
NActors::TActorId RowDispatcherActorId;
135141
ui32 PartitionId;
@@ -147,9 +153,10 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
147153
std::unique_ptr<TJsonParser> Parser;
148154
NConfig::TRowDispatcherConfig Config;
149155
ui64 UsedSize = 0;
150-
TMaybe<TParserInputType> CurrentParserTypes;
151156
const ::NMonitoring::TDynamicCounterPtr Counters;
152157
TTopicSessionMetrics Metrics;
158+
TParserSchema ParserSchema;
159+
THashMap<TString, ui64> FieldsIndexes;
153160

154161
public:
155162
explicit TTopicSession(
@@ -176,7 +183,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
176183
void SendToParsing(const TVector<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>& messages);
177184
void SendToFiltering(ui64 offset, const TVector<TVector<std::string_view>>& parsedValues);
178185
void SendData(ClientsInfo& info);
179-
void InitParser(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams);
186+
void UpdateParser();
180187
void FatalError(const TString& message, const std::unique_ptr<TJsonFilter>* filter = nullptr);
181188
void SendDataArrived(ClientsInfo& client);
182189
void StopReadSession();
@@ -200,6 +207,9 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
200207

201208
void PrintInternalState();
202209
void SendSessionError(NActors::TActorId readActorId, const TString& message);
210+
TVector<TVector<std::string_view>> RebuildJson(const ClientsInfo& info, const TVector<TVector<std::string_view>>& parsedValues);
211+
void UpdateParserSchema(const TParserInputType& inputType);
212+
void UpdateFieldsIds(ClientsInfo& clientInfo);
203213

204214
private:
205215

@@ -340,11 +350,11 @@ void TTopicSession::CreateTopicSession() {
340350
return;
341351
}
342352

343-
// Use any sourceParams.
344-
const NYql::NPq::NProto::TDqPqTopicSource& sourceParams = Clients.begin()->second.Settings.GetSource();
345-
346353
if (!ReadSession) {
347-
InitParser(sourceParams);
354+
UpdateParser();
355+
356+
// Use any sourceParams.
357+
const NYql::NPq::NProto::TDqPqTopicSource& sourceParams = Clients.begin()->second.Settings.GetSource();
348358
ReadSession = GetTopicClient(sourceParams).CreateReadSession(GetReadSessionSettings(sourceParams));
349359
SubscribeOnNextEvent();
350360
}
@@ -362,8 +372,21 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvCreateSession::TPtr&) {
362372
CreateTopicSession();
363373
}
364374

375+
TVector<TVector<std::string_view>> TTopicSession::RebuildJson(const ClientsInfo& info, const TVector<TVector<std::string_view>>& parsedValues) {
376+
TVector<TVector<std::string_view>> result;
377+
const auto& offsets = ParserSchema.FieldsMap;
378+
result.reserve(info.FieldsIds.size());
379+
for (auto fieldId : info.FieldsIds) {
380+
Y_ENSURE(fieldId < offsets.size(), "fieldId " << fieldId << ", offsets.size() " << offsets.size());
381+
auto offset = offsets[fieldId];
382+
Y_ENSURE(offset < parsedValues.size(), "offset " << offset << ", jsonBatch.size() " << parsedValues.size());
383+
result.push_back(parsedValues[offset]);
384+
}
385+
return result;
386+
}
387+
365388
void TTopicSession::Handle(NFq::TEvPrivate::TEvDataAfterFilteration::TPtr& ev) {
366-
LOG_ROW_DISPATCHER_TRACE("TEvDataAfterFilteration, read actor id " << ev->Get()->ReadActorId.ToString());
389+
LOG_ROW_DISPATCHER_TRACE("TEvDataAfterFilteration, read actor id " << ev->Get()->ReadActorId.ToString() << ", " << ev->Get()->Json);
367390
auto it = Clients.find(ev->Get()->ReadActorId);
368391
if (it == Clients.end()) {
369392
LOG_ROW_DISPATCHER_ERROR("Skip DataAfterFilteration, wrong read actor, id " << ev->Get()->ReadActorId.ToString());
@@ -537,12 +560,12 @@ void TTopicSession::SendToParsing(const TVector<NYdb::NTopic::TReadSessionEvent:
537560

538561
void TTopicSession::SendToFiltering(ui64 offset, const TVector<TVector<std::string_view>>& parsedValues) {
539562
Y_ENSURE(parsedValues, "Expected non empty schema");
540-
LOG_ROW_DISPATCHER_TRACE("TEvDataParsed, offset " << offset << ", data:\n" << Parser->GetDebugString(parsedValues));
563+
LOG_ROW_DISPATCHER_TRACE("SendToFiltering, offset " << offset << ", data:\n" << Parser->GetDebugString(parsedValues));
541564

542565
for (auto& [actorId, info] : Clients) {
543566
try {
544567
if (info.Filter) {
545-
info.Filter->Push(offset, parsedValues);
568+
info.Filter->Push(offset, RebuildJson(info, parsedValues));
546569
}
547570
} catch (const std::exception& e) {
548571
FatalError(e.what(), &info.Filter);
@@ -590,28 +613,38 @@ void TTopicSession::SendData(ClientsInfo& info) {
590613
info.LastSendedNextMessageOffset = *info.NextMessageOffset;
591614
}
592615

616+
void TTopicSession::UpdateFieldsIds(ClientsInfo& info) {
617+
for (auto name : info.Settings.GetSource().GetColumns()) {
618+
auto it = FieldsIndexes.find(name);
619+
if (it == FieldsIndexes.end()) {
620+
auto nextIndex = FieldsIndexes.size();
621+
info.FieldsIds.push_back(nextIndex);
622+
FieldsIndexes[name] = nextIndex;
623+
} else {
624+
info.FieldsIds.push_back(it->second);
625+
}
626+
}
627+
}
628+
593629
void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
594630
auto it = Clients.find(ev->Sender);
595631
if (it != Clients.end()) {
596632
FatalError("Internal error: sender " + ev->Sender.ToString());
597633
return;
598634
}
599635

600-
LOG_ROW_DISPATCHER_INFO("New client, read actor id " << ev->Sender.ToString());
636+
LOG_ROW_DISPATCHER_INFO("New client: read actor id " << ev->Sender.ToString() << ", predicate: "
637+
<< ev->Get()->Record.GetSource().GetPredicate() << ", offset: " << ev->Get()->Record.GetOffset());
601638

602639
auto columns = GetVector(ev->Get()->Record.GetSource().GetColumns());
603640
auto types = GetVector(ev->Get()->Record.GetSource().GetColumnTypes());
604-
auto parserType = std::make_pair(columns, types);
605-
if (CurrentParserTypes && *CurrentParserTypes != parserType) {
606-
SendSessionError(ev->Sender, "Different columns/types, use same in all queries");
607-
return;
608-
}
609641

610642
try {
611643
auto& clientInfo = Clients.emplace(
612644
std::piecewise_construct,
613645
std::forward_as_tuple(ev->Sender),
614646
std::forward_as_tuple(ev)).first->second;
647+
UpdateFieldsIds(clientInfo);
615648

616649
TString predicate = clientInfo.Settings.GetSource().GetPredicate();
617650
if (!predicate.empty()) {
@@ -626,11 +659,9 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
626659
ClientsWithoutPredicate.insert(ev->Sender);
627660
}
628661

629-
LOG_ROW_DISPATCHER_INFO("New client: offset " << clientInfo.NextMessageOffset << ", predicate: " << clientInfo.Settings.GetSource().GetPredicate());
630-
631662
if (ReadSession) {
632663
if (clientInfo.Settings.HasOffset() && (clientInfo.Settings.GetOffset() <= LastMessageOffset)) {
633-
LOG_ROW_DISPATCHER_INFO("New client has less offset than the last message, stop (restart) topic session");
664+
LOG_ROW_DISPATCHER_INFO("New client has less offset (" << clientInfo.Settings.GetOffset() << ") than the last message (" << LastMessageOffset << "), stop (restart) topic session");
634665
StopReadSession();
635666
}
636667
}
@@ -641,7 +672,7 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
641672
} catch (...) {
642673
FatalError("Adding new client failed, " + CurrentExceptionMessage());
643674
}
644-
675+
UpdateParser();
645676
PrintInternalState();
646677
if (!ReadSession) {
647678
Schedule(TDuration::Seconds(Config.GetTimeoutBeforeStartSessionSec()), new NFq::TEvPrivate::TEvCreateSession());
@@ -665,20 +696,69 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) {
665696

666697
auto it = Clients.find(ev->Sender);
667698
if (it == Clients.end()) {
668-
LOG_ROW_DISPATCHER_DEBUG("Wrong ClientSettings"); // TODO
699+
LOG_ROW_DISPATCHER_DEBUG("Wrong ClientSettings");
669700
return;
670701
}
671702
Clients.erase(it);
672703
ClientsWithoutPredicate.erase(ev->Sender);
704+
if (Clients.empty()) {
705+
StopReadSession();
706+
}
707+
UpdateParser();
673708
}
674709

675-
void TTopicSession::InitParser(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) {
676-
if (Parser) {
710+
void CollectColumns(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams, TSet<std::pair<TString, TString>>& columns) {
711+
auto size = sourceParams.GetColumns().size();
712+
Y_ENSURE(size == sourceParams.GetColumnTypes().size());
713+
714+
for (int i = 0; i < size; ++i) {
715+
auto name = sourceParams.GetColumns().Get(i);
716+
auto type = sourceParams.GetColumnTypes().Get(i);
717+
columns.emplace(name, type);
718+
}
719+
}
720+
721+
void TTopicSession::UpdateParserSchema(const TParserInputType& inputType) {
722+
ParserSchema.FieldsMap.clear();
723+
ParserSchema.FieldsMap.resize(FieldsIndexes.size());
724+
ui64 offset = 0;
725+
for (const auto& [name, type]: inputType) {
726+
Y_ENSURE(FieldsIndexes.contains(name));
727+
ui64 index = FieldsIndexes[name];
728+
ParserSchema.FieldsMap[index] = offset++;
729+
}
730+
ParserSchema.InputType = inputType;
731+
}
732+
733+
void TTopicSession::UpdateParser() {
734+
TSet<std::pair<TString, TString>> namesWithTypes;
735+
for (auto& [readActorId, info] : Clients) {
736+
CollectColumns(info.Settings.GetSource(), namesWithTypes);
737+
}
738+
739+
if (namesWithTypes == ParserSchema.InputType) {
677740
return;
678741
}
742+
if (namesWithTypes.empty()) {
743+
LOG_ROW_DISPATCHER_INFO("No columns to parse, reset parser");
744+
Parser.reset();
745+
return;
746+
}
747+
679748
try {
680-
CurrentParserTypes = std::make_pair(GetVector(sourceParams.GetColumns()), GetVector(sourceParams.GetColumnTypes()));
681-
Parser = NewJsonParser(GetVector(sourceParams.GetColumns()), GetVector(sourceParams.GetColumnTypes()));
749+
UpdateParserSchema(namesWithTypes);
750+
751+
TVector<TString> names;
752+
TVector<TString> types;
753+
names.reserve(namesWithTypes.size());
754+
types.reserve(namesWithTypes.size());
755+
for (const auto& [name, type] : namesWithTypes) {
756+
names.push_back(name);
757+
types.push_back(type);
758+
}
759+
760+
LOG_ROW_DISPATCHER_TRACE("Init JsonParser with columns: " << JoinSeq(',', names));
761+
Parser = NewJsonParser(names, types);
682762
} catch (const NYql::NPureCalc::TCompileError& e) {
683763
FatalError(e.GetIssues());
684764
}

0 commit comments

Comments
 (0)