Skip to content

Commit 876f41f

Browse files
committed
YQ-3745 Shared reading: add raw format (ydb-platform#10294)
1 parent 2b09989 commit 876f41f

File tree

3 files changed

+23
-15
lines changed

3 files changed

+23
-15
lines changed

ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev)
591591
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();
592592
SRC_LOG_T("TEvMessageBatch from " << ev->Sender << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo());
593593
ui64 partitionId = ev->Get()->Record.GetPartitionId();
594-
YQL_ENSURE(Sessions.count(partitionId), "Unknown partition id");
594+
YQL_ENSURE(Sessions.count(partitionId), "Unknown partition id " << partitionId);
595595
auto it = Sessions.find(partitionId);
596596
if (it == Sessions.end()) {
597597
Stop("Wrong session data");

ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,11 +268,12 @@ class TPqDqIntegration: public TDqIntegrationBase {
268268
}
269269
}
270270

271-
//sharedReading = true;
272-
sharedReading = sharedReading && (format == "json_each_row");
271+
sharedReading = sharedReading && (format == "json_each_row" || (format == "raw"));
273272
TString predicateSql = NYql::FormatWhere(predicateProto);
274273
if (sharedReading) {
275-
srcDesc.SetPredicate(predicateSql);
274+
if (format == "json_each_row") {
275+
srcDesc.SetPredicate(predicateSql);
276+
}
276277
srcDesc.SetSharedReading(true);
277278
}
278279
protoSettings.PackFrom(srcDesc);

ydb/tests/fq/yds/test_row_dispatcher.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -83,29 +83,36 @@ def wait_row_dispatcher_sensor_value(kikimr, sensor, expected_count, exact_match
8383
class TestPqRowDispatcher(TestYdsBase):
8484

8585
@yq_v1
86-
def test_read_raw_format_without_row_dispatcher(self, kikimr, client):
86+
def test_read_raw_format_with_row_dispatcher(self, kikimr, client):
8787
client.create_yds_connection(
8888
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
8989
)
9090
self.init_topics("test_read_raw_format_without_row_dispatcher", create_output=False)
91-
9291
output_topic = "pq_test_pq_read_write_output"
93-
9492
create_stream(output_topic, partitions_count=1)
9593
create_read_rule(output_topic, self.consumer_name)
9694

97-
sql = Rf'''
98-
INSERT INTO {YDS_CONNECTION}.`{output_topic}`
99-
SELECT * FROM {YDS_CONNECTION}.`{self.input_topic}`;'''
95+
sql1 = Rf'''INSERT INTO {YDS_CONNECTION}.`{output_topic}`
96+
SELECT * FROM {YDS_CONNECTION}.`{self.input_topic}` WITH (format=raw, SCHEMA (data String NOT NULL));'''
10097

101-
query_id = start_yds_query(kikimr, client, sql)
98+
query_id = start_yds_query(kikimr, client, sql1)
10299
data = ['{"time" = 101;}', '{"time" = 102;}']
103100

104101
self.write_stream(data)
105-
expected = data
106-
assert self.read_stream(len(expected), topic_path=output_topic) == expected
102+
assert self.read_stream(len(data), topic_path=output_topic) == data
103+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
104+
stop_yds_query(client, query_id)
105+
106+
sql2 = Rf'''INSERT INTO {YDS_CONNECTION}.`{output_topic}`
107+
SELECT * FROM {YDS_CONNECTION}.`{self.input_topic}` WITH (format=raw, SCHEMA (data String NOT NULL))
108+
WHERE data != "romashka";'''
107109

108-
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
110+
query_id = start_yds_query(kikimr, client, sql2)
111+
data = ['{"time" = 103;}', '{"time" = 104;}']
112+
113+
self.write_stream(data)
114+
assert self.read_stream(len(data), topic_path=output_topic) == data
115+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
109116
stop_yds_query(client, query_id)
110117

111118
@yq_v1
@@ -193,7 +200,7 @@ def test_scheme_error(self, kikimr, client):
193200

194201
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
195202
issues = str(client.describe_query(query_id).result.query.issue)
196-
assert "Failed to unwrap empty optional" in issues, "Incorrect Issues: " + issues
203+
assert "Cannot parse JSON string" in issues, "Incorrect Issues: " + issues
197204

198205
wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 0)
199206
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)

0 commit comments

Comments
 (0)