Skip to content

Commit b6ac10c

Browse files
committed
YQ-3841 RD add column types validation (ydb-platform#11487)
1 parent f939529 commit b6ac10c

File tree

2 files changed

+51
-9
lines changed

2 files changed

+51
-9
lines changed

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
142142
TParserInputType InputType;
143143
};
144144

145+
struct TFieldDescription {
146+
ui64 IndexInParserSchema = 0;
147+
TString Type;
148+
};
149+
145150
bool InflightReconnect = false;
146151
TDuration ReconnectPeriod;
147152
const TString TopicPath;
@@ -168,7 +173,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
168173
const ::NMonitoring::TDynamicCounterPtr Counters;
169174
TTopicSessionMetrics Metrics;
170175
TParserSchema ParserSchema;
171-
THashMap<TString, ui64> FieldsIndexes;
176+
THashMap<TString, TFieldDescription> FieldsIndexes;
172177
NYql::IPqGateway::TPtr PqGateway;
173178
TMaybe<TString> ConsumerName;
174179

@@ -683,14 +688,16 @@ void TTopicSession::SendData(TClientsInfo& info) {
683688
}
684689

685690
void TTopicSession::UpdateFieldsIds(TClientsInfo& info) {
686-
for (auto name : info.Settings.GetSource().GetColumns()) {
691+
const auto& source = info.Settings.GetSource();
692+
for (size_t i = 0; i < source.ColumnsSize(); ++i) {
693+
const auto& name = source.GetColumns().Get(i);
687694
auto it = FieldsIndexes.find(name);
688695
if (it == FieldsIndexes.end()) {
689696
auto nextIndex = FieldsIndexes.size();
690697
info.FieldsIds.push_back(nextIndex);
691-
FieldsIndexes[name] = nextIndex;
698+
FieldsIndexes[name] = {nextIndex, source.GetColumnTypes().Get(i)};
692699
} else {
693-
info.FieldsIds.push_back(it->second);
700+
info.FieldsIds.push_back(it->second.IndexInParserSchema);
694701
}
695702
}
696703
}
@@ -816,7 +823,7 @@ void TTopicSession::UpdateParserSchema(const TParserInputType& inputType) {
816823
ui64 offset = 0;
817824
for (const auto& [name, type]: inputType) {
818825
Y_ENSURE(FieldsIndexes.contains(name));
819-
ui64 index = FieldsIndexes[name];
826+
ui64 index = FieldsIndexes[name].IndexInParserSchema;
820827
ParserSchema.FieldsMap[index] = offset++;
821828
}
822829
ParserSchema.InputType = inputType;
@@ -944,13 +951,26 @@ bool TTopicSession::CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr&
944951
SendSessionError(ev->Sender, "Internal error: such a client already exists");
945952
return false;
946953
}
947-
if (!Config.GetWithoutConsumer()
948-
&& ConsumerName
949-
&& ConsumerName != ev->Get()->Record.GetSource().GetConsumerName()) {
950-
LOG_ROW_DISPATCHER_INFO("Different consumer, expected " << ConsumerName << ", actual " << ev->Get()->Record.GetSource().GetConsumerName() << ", send error");
954+
955+
const auto& source = ev->Get()->Record.GetSource();
956+
if (!Config.GetWithoutConsumer() && ConsumerName && ConsumerName != source.GetConsumerName()) {
957+
LOG_ROW_DISPATCHER_INFO("Different consumer, expected " << ConsumerName << ", actual " << source.GetConsumerName() << ", send error");
951958
SendSessionError(ev->Sender, TStringBuilder() << "Use the same consumer in all queries via RD (current consumer " << ConsumerName << ")");
952959
return false;
953960
}
961+
962+
Y_ENSURE(source.ColumnsSize() == source.ColumnTypesSize());
963+
for (size_t i = 0; i < source.ColumnsSize(); ++i) {
964+
const auto& name = source.GetColumns().Get(i);
965+
const auto& type = source.GetColumnTypes().Get(i);
966+
const auto it = FieldsIndexes.find(name);
967+
if (it != FieldsIndexes.end() && it->second.Type != type) {
968+
LOG_ROW_DISPATCHER_INFO("Different column `" << name << "` type, expected " << it->second.Type << ", actual " << type << ", send error");
969+
SendSessionError(ev->Sender, TStringBuilder() << "Use the same column type in all queries via RD, current type for column `" << name << "` is " << it->second.Type << " (requested type is " << type <<")");
970+
return false;
971+
}
972+
}
973+
954974
return true;
955975
}
956976

ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,28 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
435435
StopSession(ReadActorId1, source1);
436436
StopSession(ReadActorId2, source2);
437437
}
438+
439+
Y_UNIT_TEST_F(TwoSessionsWithDifferentColumnTypes, TFixture) {
440+
const TString topicName = "dif_types";
441+
PQCreateStream(topicName);
442+
Init(topicName);
443+
444+
auto source1 = BuildSource(topicName);
445+
source1.AddColumns("field1");
446+
source1.AddColumnTypes("[OptionalType; [DataType; String]]");
447+
StartSession(ReadActorId1, source1);
448+
449+
TString json1 = "{\"dt\":101,\"field1\":null,\"value\":\"value1\"}";
450+
PQWrite({ json1 }, topicName);
451+
ExpectNewDataArrived({ReadActorId1});
452+
ExpectMessageBatch(ReadActorId1, { json1 });
453+
454+
auto source2 = BuildSource(topicName);
455+
source2.AddColumns("field1");
456+
source2.AddColumnTypes("[DataType; String]");
457+
StartSession(ReadActorId2, source2);
458+
ExpectSessionError(ReadActorId2, "Use the same column type in all queries via RD, current type for column `field1` is [OptionalType; [DataType; String]] (requested type is [DataType; String])");
459+
}
438460
}
439461

440462
}

0 commit comments

Comments
 (0)