Skip to content

Commit 43332a1

Browse files
committed
fix(cluster): fix precommit deser and ser
1 parent 6526bd3 commit 43332a1

File tree

7 files changed

+80
-8
lines changed

7 files changed

+80
-8
lines changed

src/query/service/src/api/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub use rpc::FragmentPayload;
3030
pub use rpc::FragmentPlanPacket;
3131
pub use rpc::InitNodesChannelPacket;
3232
pub use rpc::MergeExchange;
33+
pub use rpc::PrecommitBlock;
3334
pub use rpc::QueryFragmentsPlanPacket;
3435
pub use rpc::ServerFlightExchange;
3536
pub use rpc::ShuffleDataExchange;

src/query/service/src/api/rpc/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,5 @@ pub use packets::FragmentPayload;
4343
pub use packets::FragmentPlanPacket;
4444
pub use packets::InitNodesChannelPacket;
4545
pub use packets::Packet;
46+
pub use packets::PrecommitBlock;
4647
pub use packets::QueryFragmentsPlanPacket;

src/query/service/src/api/rpc/packets/packet_data.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ use byteorder::ReadBytesExt;
2626
use byteorder::WriteBytesExt;
2727
use common_arrow::arrow::io::flight::deserialize_batch;
2828
use common_arrow::arrow::io::flight::serialize_batch;
29-
use common_arrow::arrow::io::flight::serialize_schema_to_info;
30-
use common_arrow::arrow::io::ipc::read::deserialize_schema;
3129
use common_arrow::arrow::io::ipc::write::default_ipc_fields;
3230
use common_arrow::arrow::io::ipc::write::WriteOptions;
3331
use common_arrow::arrow::io::ipc::IpcSchema;
@@ -67,7 +65,7 @@ pub enum ProgressInfo {
6765
ResultProgress(ProgressValues),
6866
}
6967

70-
#[derive(Debug)]
68+
#[derive(Debug, Eq, PartialEq, Clone)]
7169
pub struct PrecommitBlock(pub DataBlock);
7270

7371
#[derive(Debug)]
@@ -187,20 +185,21 @@ impl From<FragmentData> for FlightData {
187185
impl PrecommitBlock {
188186
pub fn write<T: Write>(self, bytes: &mut T) -> Result<()> {
189187
let data_block = self.0;
190-
let arrow_schema = data_block.schema().to_arrow();
191-
let schema = serialize_schema_to_info(&arrow_schema, None)?;
188+
let data_schema = data_block.schema();
189+
let serialized_schema = serde_json::to_vec(data_schema)?;
190+
let arrow_schema = data_schema.to_arrow();
192191

193192
// schema_flight
194193
let options = WriteOptions { compression: None };
195194
let ipc_fields = default_ipc_fields(&arrow_schema.fields);
196195
let chunks = data_block.try_into()?;
197196
let (_dicts, data_flight) = serialize_batch(&chunks, &ipc_fields, &options)?;
198197

199-
bytes.write_u64::<BigEndian>(schema.len() as u64)?;
198+
bytes.write_u64::<BigEndian>(serialized_schema.len() as u64)?;
200199
bytes.write_u64::<BigEndian>(data_flight.data_header.len() as u64)?;
201200
bytes.write_u64::<BigEndian>(data_flight.data_body.len() as u64)?;
202201

203-
bytes.write_all(&schema)?;
202+
bytes.write_all(&serialized_schema)?;
204203
bytes.write_all(&data_flight.data_header)?;
205204
bytes.write_all(&data_flight.data_body)?;
206205
Ok(())
@@ -218,7 +217,8 @@ impl PrecommitBlock {
218217
bytes.read_exact(&mut schema)?;
219218
bytes.read_exact(&mut flight_header)?;
220219
bytes.read_exact(&mut flight_body)?;
221-
let (arrow_schema, _) = deserialize_schema(&schema)?;
220+
let data_schema = serde_json::from_slice::<DataSchema>(&schema)?;
221+
let arrow_schema = data_schema.to_arrow();
222222

223223
let ipc_fields = default_ipc_fields(&arrow_schema.fields);
224224
let ipc_schema = IpcSchema {

src/query/service/tests/it/api/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@
1414

1515
mod http;
1616
mod http_service;
17+
mod rpc;
1718
mod rpc_service;
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
mod packets;
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
mod packet_data;
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use common_base::base::tokio;
18+
use common_datablocks::DataBlock;
19+
use common_datavalues::prelude::*;
20+
use common_datavalues::DataField;
21+
use common_datavalues::DataSchema;
22+
use common_datavalues::UInt8Column;
23+
use common_exception::Result;
24+
use databend_query::api::PrecommitBlock;
25+
26+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
27+
async fn test_precommit_ser_and_deser() -> Result<()> {
28+
let data_schema = DataSchema::new(vec![DataField::new("test", UInt8Type::new_impl())]);
29+
30+
let test_precommit = PrecommitBlock(DataBlock::create(Arc::new(data_schema), vec![Arc::new(
31+
UInt8Column::new_from_vec(vec![1, 2, 3]),
32+
)]));
33+
34+
let mut bytes = vec![];
35+
PrecommitBlock::write(test_precommit.clone(), &mut bytes)?;
36+
let mut read = bytes.as_slice();
37+
assert_eq!(test_precommit, PrecommitBlock::read(&mut read)?);
38+
Ok(())
39+
}

0 commit comments

Comments
 (0)