Skip to content

Commit 0993fe0

Browse files
authored
fix: FlightData must contain valid data header (#15559)
* fix: FlightData must contain valid data header Signed-off-by: Xuanwo <github@xuanwo.io> * Fix toml format Signed-off-by: Xuanwo <github@xuanwo.io> --------- Signed-off-by: Xuanwo <github@xuanwo.io>
1 parent c1b09ab commit 0993fe0

File tree

4 files changed

+45
-6
lines changed

4 files changed

+45
-6
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,8 @@ arrow-schema = { version = "51", features = ["serde"] }
223223
arrow-select = { version = "51" }
224224
parquet = { version = "51", features = ["async"] }
225225
parquet_rs = { package = "parquet", version = "51" }
226+
# Must use the same version with arrow-ipc
227+
flatbuffers = { version = "23" }
226228

227229
# Crates from risingwavelabs
228230
arrow-udf-js = { package = "arrow-udf-js", git = "https://github.com/datafuse-extras/arrow-udf", rev = "d0a21f0" }

src/query/service/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ ctor = "0.1.26"
125125
dashmap = { workspace = true }
126126
derive-visitor = { workspace = true }
127127
ethnum = { workspace = true }
128+
flatbuffers = { workspace = true }
128129
futures = { workspace = true }
129130
futures-util = { workspace = true }
130131
headers = "0.4.0"

src/query/service/src/servers/flight_sql/flight_sql_service/query.rs

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,21 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::ops::Deref;
1516
use std::sync::atomic::AtomicBool;
1617
use std::sync::atomic::Ordering;
1718
use std::sync::Arc;
19+
use std::sync::LazyLock;
1820

1921
use arrow_flight::FlightData;
2022
use arrow_flight::SchemaAsIpc;
2123
use arrow_ipc::writer;
2224
use arrow_ipc::writer::IpcWriteOptions;
25+
use arrow_ipc::MessageBuilder;
26+
use arrow_ipc::MessageHeader;
27+
use arrow_ipc::MetadataVersion;
2328
use arrow_schema::Schema as ArrowSchema;
29+
use bytes::Bytes;
2430
use databend_common_base::base::tokio;
2531
use databend_common_exception::ErrorCode;
2632
use databend_common_exception::Result;
@@ -33,6 +39,7 @@ use databend_common_sql::Planner;
3339
use databend_common_storages_fuse::TableContext;
3440
use futures::Stream;
3541
use futures::StreamExt;
42+
use prost::bytes;
3643
use serde::Deserialize;
3744
use serde::Serialize;
3845
use tonic::Status;
@@ -47,6 +54,29 @@ use crate::sessions::Session;
4754
/// A app_metakey which indicates the data is a progress type
4855
static H_PROGRESS: u8 = 0x01;
4956

57+
/// The generated app metadata for our progress.
58+
static APP_METADATA_PROGRESS: LazyLock<Bytes> = LazyLock::new(|| Bytes::from(vec![H_PROGRESS]));
59+
60+
/// The data header for our progress.
61+
///
62+
/// This build process is inspired from [arrow_ipc::writer::IpcDataGenerator](https://docs.rs/arrow-ipc/51.0.0/arrow_ipc/writer/struct.IpcDataGenerator.html#method.schema_to_bytes)
63+
static DATA_HEADER_PROGRESS: LazyLock<Bytes> = LazyLock::new(|| {
64+
let mut fbb = flatbuffers::FlatBufferBuilder::new();
65+
66+
let mut builder = MessageBuilder::new(&mut fbb);
67+
// Use the same version with arrow_ipc.
68+
builder.add_version(MetadataVersion::V5);
69+
// Use NONE as the header type.
70+
builder.add_header_type(MessageHeader::NONE);
71+
// We don't have other data to write in this message, just finish.
72+
let data = builder.finish();
73+
74+
// finish the flat buffers.
75+
fbb.finish(data, None);
76+
77+
Bytes::copy_from_slice(fbb.finished_data())
78+
});
79+
5080
impl FlightSqlServiceImpl {
5181
pub(crate) fn schema_to_flight_data(data_schema: DataSchema) -> FlightData {
5282
let arrow_schema = ArrowSchema::from(&data_schema);
@@ -69,6 +99,16 @@ impl FlightSqlServiceImpl {
6999
Ok(encoded_batch.into())
70100
}
71101

102+
fn progress_to_flight_data(progress: &ProgressValue) -> Result<FlightData> {
103+
let progress = serde_json::to_vec(&progress)
104+
.map_err(|e| ErrorCode::Internal(format!("encode progress into json failed: {e:?}")))?;
105+
106+
Ok(FlightData::new()
107+
.with_app_metadata(APP_METADATA_PROGRESS.deref().clone())
108+
.with_data_header(DATA_HEADER_PROGRESS.deref().clone())
109+
.with_data_body(Bytes::from(progress)))
110+
}
111+
72112
#[async_backtrace::framed]
73113
pub async fn plan_sql(
74114
&self,
@@ -211,12 +251,7 @@ impl FlightSqlServiceImpl {
211251
progress.write_bytes = write_progress.bytes;
212252
}
213253

214-
let progress = serde_json::to_vec(&progress).unwrap();
215-
Some(FlightData {
216-
app_metadata: vec![H_PROGRESS].into(),
217-
data_body: progress.into(),
218-
..Default::default()
219-
})
254+
Some(Self::progress_to_flight_data(&progress).unwrap())
220255
};
221256

222257
while !is_finished.load(Ordering::SeqCst) {

0 commit comments

Comments
 (0)