Skip to content

Commit c53bc01

Browse files
Merge pull request #331 from apecloud/feat/migrate_multi_tables_in_parallel
fix mysql table structure migration errors, for: default values and comments
2 parents 17ff082 + 351f716 commit c53bc01

File tree

19 files changed

+378
-63
lines changed

19 files changed

+378
-63
lines changed

dt-common/src/meta/struct_meta/statement/mysql_create_table_statement.rs

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::meta::struct_meta::structure::column::ColumnDefault;
12
use crate::{config::config_enums::DbType, rdb_filter::RdbFilter};
23

34
use crate::meta::struct_meta::structure::{
@@ -108,65 +109,71 @@ impl MysqlCreateTableStatement {
108109
}
109110

110111
if !table.table_comment.is_empty() {
111-
sql = format!("{} COMMENT='{}'", sql, table.table_comment);
112+
sql = format!("{} COMMENT='{}'", sql, Self::escape(&table.table_comment));
112113
}
113114

114115
sql
115116
}
116117

117118
fn columns_to_sql(columns: &mut Vec<Column>) -> (String, Vec<String>) {
118-
let (mut sql, mut pks) = (String::new(), Vec::new());
119+
let (mut sql_lines, mut pks) = (Vec::new(), Vec::new());
119120

120121
columns.sort_by(|c1, c2| c1.ordinal_position.cmp(&c2.ordinal_position));
121-
for i in columns {
122-
sql.push_str(&format!("`{}` {} ", i.column_name, i.column_type));
122+
for i in columns.iter() {
123+
let mut line = String::new();
124+
line.push_str(&format!("`{}` {}", i.column_name, i.column_type));
123125

124126
if !i.character_set_name.is_empty() {
125-
sql.push_str(&format!("CHARACTER SET {} ", i.character_set_name))
127+
line.push_str(&format!(" CHARACTER SET {}", i.character_set_name))
126128
}
127129

128130
if !i.collation_name.is_empty() {
129-
sql.push_str(&format!("COLLATE {} ", i.collation_name))
131+
line.push_str(&format!(" COLLATE {}", i.collation_name))
130132
}
131133

132-
if let Some(v) = &i.column_default {
133-
if v.to_lowercase().starts_with("current_") {
134-
sql.push_str(&format!("DEFAULT {} ", v));
135-
} else {
136-
sql.push_str(&format!("DEFAULT '{}' ", v));
134+
match &i.column_default {
135+
Some(ColumnDefault::Expression(v)) => line.push_str(&format!(" DEFAULT {}", v)),
136+
Some(ColumnDefault::Literal(v)) => {
137+
if i.column_type.to_lowercase().starts_with("bit") {
138+
// https://github.com/apecloud/ape-dts/issues/319
139+
// CREATE TABLE a(b bit(1) default b'1');
140+
line.push_str(&format!(" DEFAULT {}", v))
141+
} else {
142+
line.push_str(&format!(" DEFAULT '{}'", Self::escape(v)))
143+
}
137144
}
145+
_ => {}
138146
}
139147

140-
if !i.extra.is_empty() {
141-
// DEFAULT_GENERATED
142-
// DEFAULT_GENERATED on update CURRENT_TIMESTAMP
143-
sql.push_str(&format!("{} ", i.extra.replace("DEFAULT_GENERATED", "")));
148+
// auto_increment
149+
// on update CURRENT_TIMESTAMP
150+
// mysql 8.0:
151+
// DEFAULT_GENERATED
152+
// DEFAULT_GENERATED on update CURRENT_TIMESTAMP
153+
let extra = i.extra.replacen("DEFAULT_GENERATED", "", 1);
154+
if !extra.is_empty() {
155+
line.push_str(&format!(" {}", extra));
144156
}
145157

146158
let nullable = if !i.is_nullable {
147-
String::from("NOT NULL ")
159+
String::from("NOT NULL")
148160
} else {
149-
String::from("NULL ")
161+
String::from("NULL")
150162
};
151163

152164
if !i.column_comment.is_empty() {
153-
sql.push_str(&format!("COMMENT '{}' ", i.column_comment))
165+
line.push_str(&format!(" COMMENT '{}'", Self::escape(&i.column_comment)))
154166
}
155167

156-
sql.push_str(&format!("{} ", nullable));
157-
158-
sql.push(',');
168+
line.push_str(&format!(" {}", nullable));
169+
sql_lines.push(line);
159170

160171
if i.column_key == "PRI" {
161172
pks.push(i.column_name.clone());
162173
}
163174
}
164175

165-
if sql.ends_with(',') {
166-
sql = sql[0..sql.len() - 1].to_string();
167-
}
168-
169-
(sql, pks)
176+
(sql_lines.join(", "), pks)
170177
}
171178

172179
fn index_to_sql(index: &mut Index) -> String {
@@ -190,7 +197,7 @@ impl MysqlCreateTableStatement {
190197
);
191198

192199
if !index.comment.is_empty() {
193-
sql.push_str(&format!("COMMENT '{}' ", index.comment));
200+
sql.push_str(&format!("COMMENT '{}' ", Self::escape(&index.comment)));
194201
}
195202

196203
sql
@@ -207,4 +214,8 @@ impl MysqlCreateTableStatement {
207214
constraint.definition
208215
)
209216
}
217+
218+
fn escape(text: &str) -> String {
219+
text.replace('\'', "\'\'").to_string()
220+
}
210221
}

dt-common/src/meta/struct_meta/statement/pg_create_table_statement.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::config::config_enums::DbType;
44
use crate::error::Error;
55
use crate::meta::ddl_meta::ddl_parser::DdlParser;
66
use crate::meta::ddl_meta::ddl_statement::DdlStatement;
7+
use crate::meta::struct_meta::structure::column::ColumnDefault;
78
use crate::rdb_filter::RdbFilter;
89

910
use crate::meta::struct_meta::structure::{
@@ -156,7 +157,9 @@ impl PgCreateTableStatement {
156157
sql.push_str("NOT NULL ");
157158
}
158159
match &column.column_default {
159-
Some(x) => sql.push_str(format!("DEFAULT {} ", x).as_str()),
160+
Some(ColumnDefault::Expression(v)) | Some(ColumnDefault::Literal(v)) => {
161+
sql.push_str(format!("DEFAULT {} ", v).as_str())
162+
}
160163
None => {}
161164
}
162165
match &column.generated {

dt-common/src/meta/struct_meta/structure/column.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
pub struct Column {
33
pub column_name: String,
44
pub ordinal_position: u32,
5-
pub column_default: Option<String>,
5+
pub column_default: Option<ColumnDefault>,
66
pub is_nullable: bool,
77
pub column_type: String, // varchar(100)
88
pub column_key: String, // PRI, MUL
@@ -12,3 +12,9 @@ pub struct Column {
1212
pub character_set_name: String,
1313
pub collation_name: String,
1414
}
15+
16+
#[derive(Clone, Debug, PartialEq)]
17+
pub enum ColumnDefault {
18+
Literal(String),
19+
Expression(String),
20+
}

dt-connector/src/meta_fetcher/mysql/mysql_struct_fetcher.rs

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ use std::{
55

66
use anyhow::bail;
77
use dt_common::meta::{
8-
mysql::mysql_meta_manager::MysqlMetaManager,
8+
mysql::{mysql_col_type::MysqlColType, mysql_meta_manager::MysqlMetaManager},
99
struct_meta::{
1010
statement::{
1111
mysql_create_database_statement::MysqlCreateDatabaseStatement,
1212
mysql_create_table_statement::MysqlCreateTableStatement,
1313
},
1414
structure::{
15-
column::Column,
15+
column::{Column, ColumnDefault},
1616
constraint::{Constraint, ConstraintType},
1717
database::Database,
1818
index::{Index, IndexColumn, IndexKind, IndexType},
@@ -146,14 +146,24 @@ impl MysqlStructFetcher {
146146
let engine_name = Self::get_str_with_null(&row, "ENGINE")?;
147147
let table_comment = Self::get_str_with_null(&row, "TABLE_COMMENT")?;
148148
let is_nullable = Self::get_str_with_null(&row, "IS_NULLABLE")?.to_lowercase() == "yes";
149+
let extra = Self::get_str_with_null(&row, "EXTRA")?;
150+
let column_name = Self::get_str_with_null(&row, "COLUMN_NAME")?;
151+
let column_default = if let Some(column_default_str) = row.get("COLUMN_DEFAULT") {
152+
Some(
153+
self.parse_column_default(&db, &tb, &column_name, column_default_str, &extra)
154+
.await?,
155+
)
156+
} else {
157+
None
158+
};
149159
let column = Column {
150-
column_name: Self::get_str_with_null(&row, "COLUMN_NAME")?,
160+
column_name,
151161
ordinal_position: row.try_get("ORDINAL_POSITION")?,
152-
column_default: row.get("COLUMN_DEFAULT"),
162+
column_default,
153163
is_nullable,
154164
column_type: Self::get_str_with_null(&row, "COLUMN_TYPE")?,
155165
column_key: Self::get_str_with_null(&row, "COLUMN_KEY")?,
156-
extra: Self::get_str_with_null(&row, "EXTRA")?,
166+
extra,
157167
column_comment: Self::get_str_with_null(&row, "COLUMN_COMMENT")?,
158168
character_set_name: Self::get_str_with_null(&row, "CHARACTER_SET_NAME")?,
159169
collation_name: Self::get_str_with_null(&row, "COLLATION_NAME")?,
@@ -184,6 +194,62 @@ impl MysqlStructFetcher {
184194
Ok(results)
185195
}
186196

197+
async fn parse_column_default(
198+
&mut self,
199+
schema: &str,
200+
tb: &str,
201+
col: &str,
202+
column_default_str: &str,
203+
extra: &str,
204+
) -> anyhow::Result<ColumnDefault> {
205+
// https://dev.mysql.com/doc/refman/8.4/en/data-type-defaults.html
206+
// https://dev.mysql.com/doc/refman/5.7/en/data-type-defaults.html
207+
let str = column_default_str.to_string();
208+
// 5.7 case: CREATE TABLE a (d DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP);
209+
// | COLUMN_DEFAULT | EXTRA |
210+
// | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
211+
// 8.0 case 1: CREATE TABLE a (d DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP);
212+
// | COLUMN_DEFAULT | EXTRA |
213+
// | CURRENT_TIMESTAMP | DEFAULT_GENERATED on update CURRENT_TIMESTAMP |
214+
// 8.0 case 2: CREATE TABLE a (t TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
215+
// | COLUMN_DEFAULT | EXTRA |
216+
// | CURRENT_TIMESTAMP | DEFAULT_GENERATED |
217+
// enclose expression default values within parentheses to distinguish them from literal constant default values,
218+
// with 1 exception: for TIMESTAMP and DATETIME columns, you can specify the CURRENT_TIMESTAMP function as the default, without enclosing parentheses
219+
// 8.0 case 3: CREATE TABLE a (f FLOAT DEFAULT (RAND() * RAND()), j JSON DEFAULT (JSON_ARRAY()));
220+
// |COLUMN_NAME | COLUMN_DEFAULT | EXTRA |
221+
// |f | (rand() * rand()) | DEFAULT_GENERATED |
222+
// |j | json_array() | DEFAULT_GENERATED |
223+
if extra.starts_with("DEFAULT_GENERATED") || extra.to_lowercase().contains("on update") {
224+
if str.to_uppercase().eq("CURRENT_TIMESTAMP")
225+
|| (str.starts_with("(") && str.ends_with(")"))
226+
{
227+
return Ok(ColumnDefault::Expression(str));
228+
} else {
229+
return Ok(ColumnDefault::Expression(format!("({})", str)));
230+
}
231+
}
232+
233+
let tb_meta = self.meta_manager.get_tb_meta(schema, tb).await?;
234+
let col_type = tb_meta.get_col_type(col)?;
235+
// 5.7: the default value specified in a DEFAULT clause must be a literal constant;
236+
// it cannot be a function or an expression. This means, for example,
237+
// that you cannot set the default for a date column to be the value of a function
238+
// such as NOW() or CURRENT_DATE. The exception is that, for TIMESTAMP and DATETIME columns,
239+
// you can specify CURRENT_TIMESTAMP as the default.
240+
// 8.0: function or expression will also cause EXTRA to be 'DEFAULT_GENERATED'
241+
if str.to_uppercase().eq("CURRENT_TIMESTAMP") {
242+
if matches!(
243+
col_type,
244+
MysqlColType::DateTime { .. } | MysqlColType::Timestamp { .. }
245+
) {
246+
return Ok(ColumnDefault::Expression(str));
247+
}
248+
}
249+
250+
Ok(ColumnDefault::Literal(str))
251+
}
252+
187253
async fn get_indexes(&mut self, tb: &str) -> anyhow::Result<HashMap<String, Vec<Index>>> {
188254
let mut index_map: HashMap<(String, String), Index> = HashMap::new();
189255

dt-connector/src/meta_fetcher/pg/pg_struct_fetcher.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use dt_common::meta::struct_meta::{
77
pg_create_table_statement::PgCreateTableStatement,
88
},
99
structure::{
10-
column::Column,
10+
column::{Column, ColumnDefault},
1111
comment::{Comment, CommentType},
1212
constraint::{Constraint, ConstraintType},
1313
index::{Index, IndexKind},
@@ -204,7 +204,7 @@ impl PgStructFetcher {
204204

205205
let mut independent_squence_names = Vec::new();
206206
for column in table.columns.iter() {
207-
if let Some(default_value) = &column.column_default {
207+
if let Some(ColumnDefault::Literal(default_value)) = &column.column_default {
208208
let (schema, sequence_name) =
209209
Self::get_sequence_name_by_default_value(default_value);
210210
// example, default_value is 'Standard'::text
@@ -355,10 +355,15 @@ impl PgStructFetcher {
355355
let identity_generation = row.get("identity_generation");
356356
let generation_rule = Self::get_col_generation_rule(is_identity, identity_generation);
357357
let is_nullable = Self::get_str_with_null(&row, "is_nullable")?.to_lowercase() == "yes";
358+
let column_default = if let Some(str) = row.get("column_default") {
359+
Some(ColumnDefault::Literal(str))
360+
} else {
361+
None
362+
};
358363
let column = Column {
359364
column_name: Self::get_str_with_null(&row, "column_name")?,
360365
ordinal_position: ordinal_position as u32,
361-
column_default: row.get("column_default"),
366+
column_default,
362367
is_nullable,
363368
generated: generation_rule,
364369
..Default::default()

dt-tests/tests/.env

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ mysql_sinker_url=mysql://root:123456@127.0.0.1:3308?ssl-mode=disabled
44
# mysql meta center
55
mysql_meta_center_url=mysql://root:123456@127.0.0.1:3309?ssl-mode=disabled
66

7+
# mysql 8.0
8+
mysql_extractor_url_8_0=mysql://root:123456@127.0.0.1:3311?ssl-mode=disabled
9+
mysql_sinker_url_8_0=mysql://root:123456@127.0.0.1:3312?ssl-mode=disabled
10+
711
# mysql sensitive
812
case_sensitive_mysql_extractor_url=mysql://root:123456@127.0.0.1:3311?ssl-mode=disabled
913
case_sensitive_mysql_sinker_url=mysql://root:123456@127.0.0.1:3312?ssl-mode=disabled
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1+
key: table.struct_check_test_1.not_match_column, src_sql: CREATE TABLE IF NOT EXISTS `struct_check_test_1`.`not_match_column` (`id` int(10) unsigned auto_increment NOT NULL, `varchar_col` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `char_col` char(10) CHARACTER SET utf8 COLLATE utf8_general_ci NULL, `text_col` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL, `tinyint_col` tinyint(4) DEFAULT '0' NULL, `smallint_col` smallint(6) NULL, `mediumint_col` mediumint(9) NULL, `int_col` int(11) NULL, `bigint_col` bigint(20) NULL, `float_col` float(8,2) NULL, `double_col` double(16,4) NULL, `decimal_col` decimal(10,2) NULL, `date_col` date NULL, `datetime_col` datetime NULL, `timestamp_col` timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL, `time_col` time NULL, `year_col` year(4) NULL, `binary_col` binary(16) NULL, `varbinary_col` varbinary(255) NULL, `blob_col` blob NULL, `tinyblob_col` tinyblob NULL, `mediumblob_col` mediumblob NULL, `longblob_col` longblob NULL, `enum_col` enum('value1','value2','value3') CHARACTER SET utf8 COLLATE utf8_general_ci NULL, `set_col` set('option1','option2','option3') CHARACTER SET utf8 COLLATE utf8_general_ci NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_general_ci
2+
key: table.struct_check_test_1.not_match_column, dst_sql: CREATE TABLE IF NOT EXISTS `struct_check_test_1`.`not_match_column` (`id` int(10) unsigned auto_increment NOT NULL, `char_col` char(10) CHARACTER SET utf8 COLLATE utf8_general_ci NULL, `text_col` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL, `tinyint_col` tinyint(4) DEFAULT '0' NULL, `smallint_col` smallint(6) NULL, `mediumint_col` mediumint(9) NULL, `int_col` int(11) NULL, `bigint_col` bigint(20) NULL, `float_col` float(8,2) NULL, `double_col` double(16,4) NULL, `decimal_col` decimal(10,2) NULL, `datetime_col` datetime NULL, `timestamp_col` timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL, `time_col` time NULL, `year_col` year(4) NULL, `binary_col` binary(16) NULL, `varbinary_col` varbinary(255) NULL, `blob_col` blob NULL, `tinyblob_col` tinyblob NULL, `mediumblob_col` mediumblob NULL, `longblob_col` longblob NULL, `enum_col` enum('value1','value2','value3') CHARACTER SET utf8 COLLATE utf8_general_ci NULL, `set_col` set('option1','option2','option3') CHARACTER SET utf8 COLLATE utf8_general_ci NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_general_ci
13
key: index.struct_check_test_1.not_match_index.i4_diff_order, src_sql: CREATE INDEX `i4_diff_order` ON `struct_check_test_1`.`not_match_index` (`composite_index_col2`,`composite_index_col1`,`composite_index_col3`)
24
key: index.struct_check_test_1.not_match_index.i4_diff_order, dst_sql: CREATE INDEX `i4_diff_order` ON `struct_check_test_1`.`not_match_index` (`composite_index_col3`,`composite_index_col2`,`composite_index_col1`)
3-
key: table.struct_check_test_1.not_match_column, src_sql: CREATE TABLE IF NOT EXISTS `struct_check_test_1`.`not_match_column` (`id` int(10) unsigned auto_increment NOT NULL ,`varchar_col` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL ,`char_col` char(10) CHARACTER SET utf8 COLLATE utf8_general_ci NULL ,`text_col` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL ,`tinyint_col` tinyint(4) DEFAULT '0' NULL ,`smallint_col` smallint(6) NULL ,`mediumint_col` mediumint(9) NULL ,`int_col` int(11) NULL ,`bigint_col` bigint(20) NULL ,`float_col` float(8,2) NULL ,`double_col` double(16,4) NULL ,`decimal_col` decimal(10,2) NULL ,`date_col` date NULL ,`datetime_col` datetime NULL ,`timestamp_col` timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL ,`time_col` time NULL ,`year_col` year(4) NULL ,`binary_col` binary(16) NULL ,`varbinary_col` varbinary(255) NULL ,`blob_col` blob NULL ,`tinyblob_col` tinyblob NULL ,`mediumblob_col` mediumblob NULL ,`longblob_col` longblob NULL ,`enum_col` enum('value1','value2','value3') CHARACTER SET utf8 COLLATE utf8_general_ci NULL ,`set_col` set('option1','option2','option3') CHARACTER SET utf8 COLLATE utf8_general_ci NULL , PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_general_ci
4-
key: table.struct_check_test_1.not_match_column, dst_sql: CREATE TABLE IF NOT EXISTS `struct_check_test_1`.`not_match_column` (`id` int(10) unsigned auto_increment NOT NULL ,`char_col` char(10) CHARACTER SET utf8 COLLATE utf8_general_ci NULL ,`text_col` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL ,`tinyint_col` tinyint(4) DEFAULT '0' NULL ,`smallint_col` smallint(6) NULL ,`mediumint_col` mediumint(9) NULL ,`int_col` int(11) NULL ,`bigint_col` bigint(20) NULL ,`float_col` float(8,2) NULL ,`double_col` double(16,4) NULL ,`decimal_col` decimal(10,2) NULL ,`datetime_col` datetime NULL ,`timestamp_col` timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL ,`time_col` time NULL ,`year_col` year(4) NULL ,`binary_col` binary(16) NULL ,`varbinary_col` varbinary(255) NULL ,`blob_col` blob NULL ,`tinyblob_col` tinyblob NULL ,`mediumblob_col` mediumblob NULL ,`longblob_col` longblob NULL ,`enum_col` enum('value1','value2','value3') CHARACTER SET utf8 COLLATE utf8_general_ci NULL ,`set_col` set('option1','option2','option3') CHARACTER SET utf8 COLLATE utf8_general_ci NULL , PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_general_ci
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
[("table.struct_check_test_1.not_match_miss", "CREATE TABLE IF NOT EXISTS `struct_check_test_1`.`not_match_miss` (`id` int(11) NOT NULL ,`text` varchar(10) CHARACTER SET utf8 COLLATE utf8_general_ci NULL , PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_general_ci")]
1+
[("table.struct_check_test_1.not_match_miss", "CREATE TABLE IF NOT EXISTS `struct_check_test_1`.`not_match_miss` (`id` int(11) NOT NULL, `text` varchar(10) CHARACTER SET utf8 COLLATE utf8_general_ci NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_general_ci")]
22
key: index.struct_check_test_1.not_match_index.i6_miss, src_sql: CREATE INDEX `i6_miss` ON `struct_check_test_1`.`not_match_index` (`index_col`)
33
key: index.struct_check_test_1.not_match_index.i5_diff_name_src, src_sql: CREATE INDEX `i5_diff_name_src` ON `struct_check_test_1`.`not_match_index` (`index_col`)

0 commit comments

Comments
 (0)