Skip to content

Commit f8bebff

Browse files
authored
fix query multiple rowsets (#8738)
1 parent f9b4a7d commit f8bebff

File tree

2 files changed

+72
-61
lines changed

2 files changed

+72
-61
lines changed

ydb/core/viewer/json_pipe_req.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,12 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
150150
TString GetError() const {
151151
return std::get<TString>(Response);
152152
}
153+
154+
void Event(const TString& name) {
155+
if (Span) {
156+
Span.Event(name);
157+
}
158+
}
153159
};
154160

155161
NTabletPipe::TClientConfig GetPipeClientConfig();

ydb/core/viewer/viewer_query.h

Lines changed: 66 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class TJsonQuery : public TViewerPipeClient {
2121
using TBase = TViewerPipeClient;
2222
TJsonSettings JsonSettings;
2323
ui32 Timeout = 0;
24-
TVector<Ydb::ResultSet> ResultSets;
24+
std::vector<std::vector<Ydb::ResultSet>> ResultSets;
2525
TString Query;
2626
TString Database;
2727
TString Action;
@@ -177,11 +177,9 @@ class TJsonQuery : public TViewerPipeClient {
177177
if (SessionId) {
178178
auto event = std::make_unique<NKqp::TEvKqp::TEvCloseSessionRequest>();
179179
event->Record.MutableRequest()->SetSessionId(SessionId);
180-
BLOG_TRACE("Closing session " << SessionId);
181180
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.release());
182181
}
183182
TBase::PassAway();
184-
BLOG_TRACE("PassAway()");
185183
}
186184

187185
void ReplyAndPassAway() override {
@@ -227,7 +225,6 @@ class TJsonQuery : public TViewerPipeClient {
227225
Span.Attribute("database", Database);
228226
}
229227
}
230-
BLOG_TRACE("Creating session");
231228
CreateSessionResponse = MakeRequest<NKqp::TEvKqp::TEvCreateSessionResponse>(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.release());
232229
}
233230

@@ -257,7 +254,6 @@ class TJsonQuery : public TViewerPipeClient {
257254
TStringBuilder() << "Failed to create session, error " << ev->Get()->Record.GetYdbStatus()), "InternalError");
258255
}
259256
SessionId = CreateSessionResponse->Record.GetResponse().GetSessionId();
260-
BLOG_TRACE("Session created " << SessionId);
261257

262258
{
263259
auto event = std::make_unique<NKqp::TEvKqp::TEvPingSessionRequest>();
@@ -332,7 +328,6 @@ class TJsonQuery : public TViewerPipeClient {
332328
}
333329
ActorIdToProto(SelfId(), event->Record.MutableRequestActorId());
334330
QueryResponse = MakeRequest<NKqp::TEvKqp::TEvQueryResponse>(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release());
335-
BLOG_TRACE("Query sent");
336331
}
337332

338333
private:
@@ -432,7 +427,6 @@ class TJsonQuery : public TViewerPipeClient {
432427
}
433428

434429
void HandleReply(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
435-
BLOG_TRACE("Query response received");
436430
NJson::TJsonValue jsonResponse;
437431
jsonResponse["version"] = Viewer->GetCapabilityVersion("/viewer/query");
438432
if (ev->Get()->Record.GetRef().GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
@@ -491,6 +485,7 @@ class TJsonQuery : public TViewerPipeClient {
491485
}
492486

493487
void HandleReply(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) {
488+
QueryResponse.Event("StreamData");
494489
NKikimrKqp::TEvExecuterStreamData& data(ev->Get()->Record);
495490

496491
if (TotalRows < LimitRows) {
@@ -500,7 +495,10 @@ class TJsonQuery : public TViewerPipeClient {
500495
data.MutableResultSet()->set_truncated(true);
501496
}
502497
TotalRows += data.GetResultSet().rows_size();
503-
ResultSets.emplace_back() = std::move(*data.MutableResultSet());
498+
if (ResultSets.size() <= data.GetQueryResultIndex()) {
499+
ResultSets.resize(data.GetQueryResultIndex() + 1);
500+
}
501+
ResultSets[data.GetQueryResultIndex()].emplace_back() = std::move(*data.MutableResultSet());
504502
}
505503

506504
THolder<NKqp::TEvKqpExecuter::TEvStreamDataAck> ack = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
@@ -517,7 +515,6 @@ class TJsonQuery : public TViewerPipeClient {
517515
if (SessionId) {
518516
auto event = std::make_unique<NKqp::TEvKqp::TEvCancelQueryRequest>();
519517
event->Record.MutableRequest()->SetSessionId(SessionId);
520-
BLOG_TRACE("Cancelling query in session " << SessionId);
521518
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.release());
522519
error << ", query was cancelled";
523520
}
@@ -549,11 +546,11 @@ class TJsonQuery : public TViewerPipeClient {
549546
for (const auto& result : response.GetResults()) {
550547
Ydb::ResultSet resultSet;
551548
NKqp::ConvertKqpQueryResultToDbResult(result, &resultSet);
552-
ResultSets.emplace_back(std::move(resultSet));
549+
ResultSets.emplace_back().emplace_back(std::move(resultSet));
553550
}
554551

555552
for (const auto& result : response.GetYdbResults()) {
556-
ResultSets.emplace_back(result);
553+
ResultSets.emplace_back().emplace_back(result);
557554
}
558555
}
559556
catch (const std::exception& ex) {
@@ -568,15 +565,16 @@ class TJsonQuery : public TViewerPipeClient {
568565
if (Schema == ESchemaType::Classic) {
569566
NJson::TJsonValue& jsonResults = jsonResponse["result"];
570567
jsonResults.SetType(NJson::JSON_ARRAY);
571-
for (auto it = ResultSets.begin(); it != ResultSets.end(); ++it) {
572-
NYdb::TResultSet resultSet(*it);
573-
const auto& columnsMeta = resultSet.GetColumnsMeta();
574-
NYdb::TResultSetParser rsParser(resultSet);
575-
while (rsParser.TryNextRow()) {
576-
NJson::TJsonValue& jsonRow = jsonResults.AppendValue({});
577-
for (size_t columnNum = 0; columnNum < columnsMeta.size(); ++columnNum) {
578-
const NYdb::TColumn& columnMeta = columnsMeta[columnNum];
579-
jsonRow[columnMeta.Name] = ColumnValueToJsonValue(rsParser.ColumnParser(columnNum));
568+
for (const auto& resultSets : ResultSets) {
569+
for (NYdb::TResultSet resultSet : resultSets) {
570+
const auto& columnsMeta = resultSet.GetColumnsMeta();
571+
NYdb::TResultSetParser rsParser(resultSet);
572+
while (rsParser.TryNextRow()) {
573+
NJson::TJsonValue& jsonRow = jsonResults.AppendValue({});
574+
for (size_t columnNum = 0; columnNum < columnsMeta.size(); ++columnNum) {
575+
const NYdb::TColumn& columnMeta = columnsMeta[columnNum];
576+
jsonRow[columnMeta.Name] = ColumnValueToJsonValue(rsParser.ColumnParser(columnNum));
577+
}
580578
}
581579
}
582580
}
@@ -585,7 +583,7 @@ class TJsonQuery : public TViewerPipeClient {
585583
if (Schema == ESchemaType::Modern) {
586584
{
587585
NJson::TJsonValue& jsonColumns = jsonResponse["columns"];
588-
NYdb::TResultSet resultSet(ResultSets.front());
586+
NYdb::TResultSet resultSet(ResultSets.front().front());
589587
const auto& columnsMeta = resultSet.GetColumnsMeta();
590588
jsonColumns.SetType(NJson::JSON_ARRAY);
591589
for (size_t columnNum = 0; columnNum < columnsMeta.size(); ++columnNum) {
@@ -598,16 +596,17 @@ class TJsonQuery : public TViewerPipeClient {
598596

599597
NJson::TJsonValue& jsonResults = jsonResponse["result"];
600598
jsonResults.SetType(NJson::JSON_ARRAY);
601-
for (auto it = ResultSets.begin(); it != ResultSets.end(); ++it) {
602-
NYdb::TResultSet resultSet(*it);
603-
const auto& columnsMeta = resultSet.GetColumnsMeta();
604-
NYdb::TResultSetParser rsParser(resultSet);
605-
while (rsParser.TryNextRow()) {
606-
NJson::TJsonValue& jsonRow = jsonResults.AppendValue({});
607-
jsonRow.SetType(NJson::JSON_ARRAY);
608-
for (size_t columnNum = 0; columnNum < columnsMeta.size(); ++columnNum) {
609-
NJson::TJsonValue& jsonColumn = jsonRow.AppendValue({});
610-
jsonColumn = ColumnValueToJsonValue(rsParser.ColumnParser(columnNum));
599+
for (const auto& resultSets : ResultSets) {
600+
for (NYdb::TResultSet resultSet : resultSets) {
601+
const auto& columnsMeta = resultSet.GetColumnsMeta();
602+
NYdb::TResultSetParser rsParser(resultSet);
603+
while (rsParser.TryNextRow()) {
604+
NJson::TJsonValue& jsonRow = jsonResults.AppendValue({});
605+
jsonRow.SetType(NJson::JSON_ARRAY);
606+
for (size_t columnNum = 0; columnNum < columnsMeta.size(); ++columnNum) {
607+
NJson::TJsonValue& jsonColumn = jsonRow.AppendValue({});
608+
jsonColumn = ColumnValueToJsonValue(rsParser.ColumnParser(columnNum));
609+
}
611610
}
612611
}
613612
}
@@ -616,47 +615,53 @@ class TJsonQuery : public TViewerPipeClient {
616615
if (Schema == ESchemaType::Multi) {
617616
NJson::TJsonValue& jsonResults = jsonResponse["result"];
618617
jsonResults.SetType(NJson::JSON_ARRAY);
619-
for (auto it = ResultSets.begin(); it != ResultSets.end(); ++it) {
620-
NYdb::TResultSet resultSet(*it);
621-
const auto& columnsMeta = resultSet.GetColumnsMeta();
618+
for (const auto& resultSets : ResultSets) {
622619
NJson::TJsonValue& jsonResult = jsonResults.AppendValue({});
620+
bool hasColumns = false;
621+
for (NYdb::TResultSet resultSet : resultSets) {
622+
if (!hasColumns) {
623+
NJson::TJsonValue& jsonColumns = jsonResult["columns"];
624+
jsonColumns.SetType(NJson::JSON_ARRAY);
625+
const auto& columnsMeta = resultSet.GetColumnsMeta();
626+
for (size_t columnNum = 0; columnNum < columnsMeta.size(); ++columnNum) {
627+
NJson::TJsonValue& jsonColumn = jsonColumns.AppendValue({});
628+
const NYdb::TColumn& columnMeta = columnsMeta[columnNum];
629+
jsonColumn["name"] = columnMeta.Name;
630+
jsonColumn["type"] = columnMeta.Type.ToString();
631+
}
632+
hasColumns = true;
633+
}
623634

624-
NJson::TJsonValue& jsonColumns = jsonResult["columns"];
625-
jsonColumns.SetType(NJson::JSON_ARRAY);
626-
for (size_t columnNum = 0; columnNum < columnsMeta.size(); ++columnNum) {
627-
NJson::TJsonValue& jsonColumn = jsonColumns.AppendValue({});
628-
const NYdb::TColumn& columnMeta = columnsMeta[columnNum];
629-
jsonColumn["name"] = columnMeta.Name;
630-
jsonColumn["type"] = columnMeta.Type.ToString();
631-
}
635+
NJson::TJsonValue& jsonRows = jsonResult["rows"];
636+
NYdb::TResultSetParser rsParser(resultSet);
637+
while (rsParser.TryNextRow()) {
638+
NJson::TJsonValue& jsonRow = jsonRows.AppendValue({});
639+
jsonRow.SetType(NJson::JSON_ARRAY);
640+
for (size_t columnNum = 0; columnNum < rsParser.ColumnsCount(); ++columnNum) {
641+
NJson::TJsonValue& jsonColumn = jsonRow.AppendValue({});
642+
jsonColumn = ColumnValueToJsonValue(rsParser.ColumnParser(columnNum));
643+
}
644+
}
632645

633-
NJson::TJsonValue& jsonRows = jsonResult["rows"];
634-
NYdb::TResultSetParser rsParser(resultSet);
635-
while (rsParser.TryNextRow()) {
636-
NJson::TJsonValue& jsonRow = jsonRows.AppendValue({});
637-
jsonRow.SetType(NJson::JSON_ARRAY);
638-
for (size_t columnNum = 0; columnNum < columnsMeta.size(); ++columnNum) {
639-
NJson::TJsonValue& jsonColumn = jsonRow.AppendValue({});
640-
jsonColumn = ColumnValueToJsonValue(rsParser.ColumnParser(columnNum));
646+
if (resultSet.Truncated()) {
647+
jsonResult["truncated"] = true;
641648
}
642649
}
643-
if (resultSet.Truncated()) {
644-
jsonResult["truncated"] = true;
645-
}
646650
}
647651
}
648652

649653
if (Schema == ESchemaType::Ydb) {
650654
NJson::TJsonValue& jsonResults = jsonResponse["result"];
651655
jsonResults.SetType(NJson::JSON_ARRAY);
652-
for (auto it = ResultSets.begin(); it != ResultSets.end(); ++it) {
653-
NYdb::TResultSet resultSet(*it);
654-
const auto& columnsMeta = resultSet.GetColumnsMeta();
655-
NYdb::TResultSetParser rsParser(resultSet);
656-
while (rsParser.TryNextRow()) {
657-
NJson::TJsonValue& jsonRow = jsonResults.AppendValue({});
658-
TString row = NYdb::FormatResultRowJson(rsParser, columnsMeta, IsBase64Encode ? NYdb::EBinaryStringEncoding::Base64 : NYdb::EBinaryStringEncoding::Unicode);
659-
NJson::ReadJsonTree(row, &jsonRow);
656+
for (const auto& resultSets : ResultSets) {
657+
for (NYdb::TResultSet resultSet : resultSets) {
658+
const auto& columnsMeta = resultSet.GetColumnsMeta();
659+
NYdb::TResultSetParser rsParser(resultSet);
660+
while (rsParser.TryNextRow()) {
661+
NJson::TJsonValue& jsonRow = jsonResults.AppendValue({});
662+
TString row = NYdb::FormatResultRowJson(rsParser, columnsMeta, IsBase64Encode ? NYdb::EBinaryStringEncoding::Base64 : NYdb::EBinaryStringEncoding::Unicode);
663+
NJson::ReadJsonTree(row, &jsonRow);
664+
}
660665
}
661666
}
662667
}

0 commit comments

Comments
 (0)