Skip to content

Commit b0af919

Browse files
authored
Fix mysql blob & text types (#117)
1 parent 15d97b8 commit b0af919

File tree

3 files changed

+154
-22
lines changed

3 files changed

+154
-22
lines changed

src/sql/arrow_sql_gen/mysql.rs

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ use crate::sql::arrow_sql_gen::arrow::map_data_type_to_array_builder_optional;
22
use arrow::{
33
array::{
44
ArrayBuilder, ArrayRef, BinaryBuilder, Date32Builder, Decimal256Builder, Decimal128Builder, Float32Builder,
5-
Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, LargeStringBuilder,
6-
NullBuilder, RecordBatch, RecordBatchOptions, StringBuilder, Time64NanosecondBuilder,
7-
TimestampMicrosecondBuilder, UInt64Builder,
5+
Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, LargeBinaryBuilder,
6+
LargeStringBuilder, NullBuilder, RecordBatch, RecordBatchOptions,
7+
StringBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder, UInt64Builder,
88
},
99
datatypes::{i256, DataType, Date32Type, Field, Schema, SchemaRef, TimeUnit},
1010
};
@@ -92,13 +92,15 @@ pub fn rows_to_arrow(rows: &[Row], projected_schema: &Option<SchemaRef>) -> Resu
9292
let mut mysql_types: Vec<ColumnType> = Vec::new();
9393
let mut column_names: Vec<String> = Vec::new();
9494
let mut column_is_binary_stats: Vec<bool> = Vec::new();
95+
let mut column_use_large_str_or_blob_stats: Vec<bool> = Vec::new();
9596

9697
if !rows.is_empty() {
9798
let row = &rows[0];
9899
for column in row.columns().iter() {
99100
let column_name = column.name_str();
100101
let column_type = column.column_type();
101102
let column_is_binary = column.flags().contains(ColumnFlags::BINARY_FLAG);
103+
let column_use_large_str_or_blob = column.column_length() > 2_u32.pow(31) - 1;
102104

103105
let (decimal_precision, decimal_scale) = match column_type {
104106
ColumnType::MYSQL_TYPE_DECIMAL | ColumnType::MYSQL_TYPE_NEWDECIMAL => {
@@ -118,6 +120,7 @@ pub fn rows_to_arrow(rows: &[Row], projected_schema: &Option<SchemaRef>) -> Resu
118120
let data_type = map_column_to_data_type(
119121
column_type,
120122
column_is_binary,
123+
column_use_large_str_or_blob,
121124
decimal_precision,
122125
decimal_scale,
123126
);
@@ -132,6 +135,7 @@ pub fn rows_to_arrow(rows: &[Row], projected_schema: &Option<SchemaRef>) -> Resu
132135
mysql_types.push(column_type);
133136
column_names.push(column_name.to_string());
134137
column_is_binary_stats.push(column_is_binary);
138+
column_use_large_str_or_blob_stats.push(column_use_large_str_or_blob);
135139
}
136140
}
137141

@@ -296,10 +300,6 @@ pub fn rows_to_arrow(rows: &[Row], projected_schema: &Option<SchemaRef>) -> Resu
296300
}
297301
column_type @ (ColumnType::MYSQL_TYPE_VARCHAR
298302
| ColumnType::MYSQL_TYPE_JSON
299-
| ColumnType::MYSQL_TYPE_TINY_BLOB
300-
| ColumnType::MYSQL_TYPE_BLOB
301-
| ColumnType::MYSQL_TYPE_MEDIUM_BLOB
302-
| ColumnType::MYSQL_TYPE_LONG_BLOB
303303
| ColumnType::MYSQL_TYPE_ENUM) => {
304304
handle_primitive_type!(
305305
builder,
@@ -310,6 +310,45 @@ pub fn rows_to_arrow(rows: &[Row], projected_schema: &Option<SchemaRef>) -> Resu
310310
i
311311
);
312312
}
313+
ColumnType::MYSQL_TYPE_BLOB => {
314+
match (
315+
column_use_large_str_or_blob_stats[i],
316+
column_is_binary_stats[i],
317+
) {
318+
(true, true) => handle_primitive_type!(
319+
builder,
320+
ColumnType::MYSQL_TYPE_BLOB,
321+
LargeBinaryBuilder,
322+
Vec<u8>,
323+
row,
324+
i
325+
),
326+
(true, false) => handle_primitive_type!(
327+
builder,
328+
ColumnType::MYSQL_TYPE_BLOB,
329+
LargeStringBuilder,
330+
String,
331+
row,
332+
i
333+
),
334+
(false, true) => handle_primitive_type!(
335+
builder,
336+
ColumnType::MYSQL_TYPE_BLOB,
337+
BinaryBuilder,
338+
Vec<u8>,
339+
row,
340+
i
341+
),
342+
(false, false) => handle_primitive_type!(
343+
builder,
344+
ColumnType::MYSQL_TYPE_BLOB,
345+
StringBuilder,
346+
String,
347+
row,
348+
i
349+
),
350+
}
351+
}
313352
column_type @ (ColumnType::MYSQL_TYPE_STRING
314353
| ColumnType::MYSQL_TYPE_VAR_STRING) => {
315354
if column_is_binary_stats[i] {
@@ -424,6 +463,7 @@ pub fn rows_to_arrow(rows: &[Row], projected_schema: &Option<SchemaRef>) -> Resu
424463
pub fn map_column_to_data_type(
425464
column_type: ColumnType,
426465
column_is_binary: bool,
466+
column_use_large_str_or_blob: bool,
427467
column_decimal_precision: Option<u8>,
428468
column_decimal_scale: Option<i8>,
429469
) -> Option<DataType> {
@@ -453,11 +493,18 @@ pub fn map_column_to_data_type(
453493
ColumnType::MYSQL_TYPE_VARCHAR
454494
| ColumnType::MYSQL_TYPE_JSON
455495
| ColumnType::MYSQL_TYPE_ENUM
456-
| ColumnType::MYSQL_TYPE_SET
457-
| ColumnType::MYSQL_TYPE_TINY_BLOB
458-
| ColumnType::MYSQL_TYPE_BLOB
459-
| ColumnType::MYSQL_TYPE_MEDIUM_BLOB
460-
| ColumnType::MYSQL_TYPE_LONG_BLOB => Some(DataType::LargeUtf8),
496+
| ColumnType::MYSQL_TYPE_SET => Some(DataType::LargeUtf8),
497+
// MYSQL_TYPE_BLOB includes TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT https://dev.mysql.com/doc/c-api/8.0/en/c-api-data-structures.html
498+
// MySQL String Type Storage requirement: https://dev.mysql.com/doc/refman/8.4/en/storage-requirements.html
499+
// Binary / Utf8 stores up to 2^31 - 1 length binary / non-binary string
500+
ColumnType::MYSQL_TYPE_BLOB => {
501+
match (column_use_large_str_or_blob, column_is_binary) {
502+
(true, true) => Some(DataType::LargeBinary),
503+
(true, false) => Some(DataType::LargeUtf8),
504+
(false, true) => Some(DataType::Binary),
505+
(false, false) => Some(DataType::Utf8),
506+
}
507+
}
461508
ColumnType::MYSQL_TYPE_STRING
462509
| ColumnType::MYSQL_TYPE_VAR_STRING => {
463510
if column_is_binary {
@@ -475,6 +522,9 @@ pub fn map_column_to_data_type(
475522
| ColumnType::MYSQL_TYPE_TIMESTAMP2
476523
| ColumnType::MYSQL_TYPE_DATETIME2
477524
| ColumnType::MYSQL_TYPE_TIME2
525+
| ColumnType::MYSQL_TYPE_LONG_BLOB
526+
| ColumnType::MYSQL_TYPE_TINY_BLOB
527+
| ColumnType::MYSQL_TYPE_MEDIUM_BLOB
478528
| ColumnType::MYSQL_TYPE_GEOMETRY => {
479529
unimplemented!("Unsupported column type {:?}", column_type)
480530
}

src/sql/db_connection_pool/dbconnection/mysqlconn.rs

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ fn columns_meta_to_schema(columns_meta: Vec<Row>) -> Result<SchemaRef> {
185185

186186
let column_type = map_str_type_to_column_type(&data_type)?;
187187
let column_is_binary = map_str_type_to_is_binary(&data_type);
188+
let column_use_large_str_or_blob = map_str_type_to_use_large_str_or_blob(&data_type);
188189

189190
let (precision, scale) = match column_type {
190191
ColumnType::MYSQL_TYPE_DECIMAL | ColumnType::MYSQL_TYPE_NEWDECIMAL => {
@@ -195,9 +196,14 @@ fn columns_meta_to_schema(columns_meta: Vec<Row>) -> Result<SchemaRef> {
195196
_ => (None, None),
196197
};
197198

198-
let arrow_data_type =
199-
map_column_to_data_type(column_type, column_is_binary, precision, scale)
200-
.context(UnsupportedDataTypeSnafu { data_type })?;
199+
let arrow_data_type = map_column_to_data_type(
200+
column_type,
201+
column_is_binary,
202+
column_use_large_str_or_blob,
203+
precision,
204+
scale,
205+
)
206+
.context(UnsupportedDataTypeSnafu { data_type })?;
201207

202208
fields.push(Field::new(&column_name, arrow_data_type, true));
203209
}
@@ -231,12 +237,12 @@ fn map_str_type_to_column_type(data_type: &str) -> Result<ColumnType> {
231237
_ if data_type.starts_with("newdecimal") => ColumnType::MYSQL_TYPE_NEWDECIMAL,
232238
_ if data_type.starts_with("enum") => ColumnType::MYSQL_TYPE_ENUM,
233239
_ if data_type.starts_with("set") => ColumnType::MYSQL_TYPE_SET,
234-
_ if data_type.starts_with("tinyblob") => ColumnType::MYSQL_TYPE_TINY_BLOB,
235-
_ if data_type.starts_with("tinytext") => ColumnType::MYSQL_TYPE_TINY_BLOB,
236-
_ if data_type.starts_with("mediumblob") => ColumnType::MYSQL_TYPE_MEDIUM_BLOB,
237-
_ if data_type.starts_with("mediumtext") => ColumnType::MYSQL_TYPE_MEDIUM_BLOB,
238-
_ if data_type.starts_with("longblob") => ColumnType::MYSQL_TYPE_LONG_BLOB,
239-
_ if data_type.starts_with("longtext") => ColumnType::MYSQL_TYPE_LONG_BLOB,
240+
_ if data_type.starts_with("tinyblob") => ColumnType::MYSQL_TYPE_BLOB,
241+
_ if data_type.starts_with("tinytext") => ColumnType::MYSQL_TYPE_BLOB,
242+
_ if data_type.starts_with("mediumblob") => ColumnType::MYSQL_TYPE_BLOB,
243+
_ if data_type.starts_with("mediumtext") => ColumnType::MYSQL_TYPE_BLOB,
244+
_ if data_type.starts_with("longblob") => ColumnType::MYSQL_TYPE_BLOB,
245+
_ if data_type.starts_with("longtext") => ColumnType::MYSQL_TYPE_BLOB,
240246
_ if data_type.starts_with("blob") => ColumnType::MYSQL_TYPE_BLOB,
241247
_ if data_type.starts_with("text") => ColumnType::MYSQL_TYPE_BLOB,
242248
_ if data_type.starts_with("varchar") => ColumnType::MYSQL_TYPE_VAR_STRING,
@@ -251,7 +257,20 @@ fn map_str_type_to_column_type(data_type: &str) -> Result<ColumnType> {
251257
}
252258

253259
fn map_str_type_to_is_binary(data_type: &str) -> bool {
254-
if data_type.starts_with("binary") | data_type.starts_with("varbinary") {
260+
if data_type.starts_with("binary")
261+
| data_type.starts_with("varbinary")
262+
| data_type.starts_with("tinyblob")
263+
| data_type.starts_with("mediumblob")
264+
| data_type.starts_with("blob")
265+
| data_type.starts_with("longblob")
266+
{
267+
return true;
268+
}
269+
false
270+
}
271+
272+
fn map_str_type_to_use_large_str_or_blob(data_type: &str) -> bool {
273+
if data_type.starts_with("long") {
255274
return true;
256275
}
257276
false

tests/mysql/mod.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,68 @@ VALUES
272272
.await;
273273
}
274274

275+
async fn test_mysql_blob_types(port: usize) {
276+
let create_table_stmt = "
277+
CREATE TABLE blobs_table (
278+
tinyblob_col TINYBLOB,
279+
tinytext_col TINYTEXT,
280+
mediumblob_col MEDIUMBLOB,
281+
mediumtext_col MEDIUMTEXT,
282+
blob_col BLOB,
283+
text_col TEXT,
284+
longblob_col LONGBLOB,
285+
longtext_col LONGTEXT
286+
);
287+
";
288+
let insert_table_stmt = "
289+
INSERT INTO blobs_table (
290+
tinyblob_col, tinytext_col, mediumblob_col, mediumtext_col, blob_col, text_col, longblob_col, longtext_col
291+
)
292+
VALUES
293+
(
294+
'small_blob', 'small_text',
295+
'medium_blob', 'medium_text',
296+
'larger_blob', 'larger_text',
297+
'very_large_blob', 'very_large_text'
298+
);
299+
";
300+
301+
let schema = Arc::new(Schema::new(vec![
302+
Field::new("tinyblob_col", DataType::Binary, true),
303+
Field::new("tinytext_col", DataType::Utf8, true),
304+
Field::new("mediumblob_col", DataType::Binary, true),
305+
Field::new("mediumtext_col", DataType::Utf8, true),
306+
Field::new("blob_col", DataType::Binary, true),
307+
Field::new("text_col", DataType::Utf8, true),
308+
Field::new("longblob_col", DataType::LargeBinary, true),
309+
Field::new("longtext_col", DataType::LargeUtf8, true),
310+
]));
311+
312+
let expected_record = RecordBatch::try_new(
313+
Arc::clone(&schema),
314+
vec![
315+
Arc::new(BinaryArray::from_vec(vec![b"small_blob"])),
316+
Arc::new(StringArray::from(vec!["small_text"])),
317+
Arc::new(BinaryArray::from_vec(vec![b"medium_blob"])),
318+
Arc::new(StringArray::from(vec!["medium_text"])),
319+
Arc::new(BinaryArray::from_vec(vec![b"larger_blob"])),
320+
Arc::new(StringArray::from(vec!["larger_text"])),
321+
Arc::new(LargeBinaryArray::from_vec(vec![b"very_large_blob"])),
322+
Arc::new(LargeStringArray::from(vec!["very_large_text"])),
323+
],
324+
)
325+
.expect("Failed to created arrow record batch");
326+
327+
arrow_mysql_one_way(
328+
port,
329+
"blobs_table",
330+
create_table_stmt,
331+
insert_table_stmt,
332+
expected_record,
333+
)
334+
.await;
335+
}
336+
275337
async fn test_mysql_string_types(port: usize) {
276338
let create_table_stmt = "
277339
CREATE TABLE string_table (
@@ -489,6 +551,7 @@ async fn test_mysql_arrow_oneway() {
489551
test_mysql_timestamp_types(port).await;
490552
test_mysql_datetime_types(port).await;
491553
test_mysql_time_types(port).await;
554+
test_mysql_blob_types(port).await;
492555
test_mysql_string_types(port).await;
493556
test_mysql_decimal_types_to_decimal128(port).await;
494557
test_mysql_decimal_types_to_decimal256(port).await;

0 commit comments

Comments
 (0)