From cdcd69f361645df655ff7503bafe81f3d16786d5 Mon Sep 17 00:00:00 2001 From: penggan Date: Wed, 27 Nov 2024 18:01:13 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E4=BA=86=E7=81=AB?= =?UTF-8?q?=E5=B1=B1=E5=BC=95=E6=93=8Ebytehouse=E7=9A=84reader=E5=92=8Cwri?= =?UTF-8?q?ter=E6=8F=92=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bytehousereader/pom.xml | 12 + bytehousereader/src/main/assembly/package.xml | 35 ++ .../bytehousereader/BytehouseReader.java | 78 +++++ .../src/main/resources/plugin.json | 6 + .../main/resources/plugin_job_template.json | 16 + bytehousewriter/pom.xml | 12 + bytehousewriter/src/main/assembly/package.xml | 35 ++ .../bytehousewriter/BytehouseWriter.java | 324 ++++++++++++++++++ .../BytehouseWriterErrorCode.java | 31 ++ .../src/main/resources/plugin.json | 6 + .../main/resources/plugin_job_template.json | 21 ++ 11 files changed, 576 insertions(+) create mode 100644 bytehousereader/pom.xml create mode 100644 bytehousereader/src/main/assembly/package.xml create mode 100644 bytehousereader/src/main/java/com/alibaba/datax/plugin/reader/bytehousereader/BytehouseReader.java create mode 100644 bytehousereader/src/main/resources/plugin.json create mode 100644 bytehousereader/src/main/resources/plugin_job_template.json create mode 100644 bytehousewriter/pom.xml create mode 100644 bytehousewriter/src/main/assembly/package.xml create mode 100644 bytehousewriter/src/main/java/com/alibaba/datax/plugin/writer/bytehousewriter/BytehouseWriter.java create mode 100644 bytehousewriter/src/main/java/com/alibaba/datax/plugin/writer/bytehousewriter/BytehouseWriterErrorCode.java create mode 100644 bytehousewriter/src/main/resources/plugin.json create mode 100644 bytehousewriter/src/main/resources/plugin_job_template.json diff --git a/bytehousereader/pom.xml b/bytehousereader/pom.xml new file mode 100644 index 0000000000..8aad5ca533 --- /dev/null +++ b/bytehousereader/pom.xml @@ -0,0 +1,12 @@ + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + bytehousereader + Archetype - bytehousereader + http://maven.apache.org + diff --git a/bytehousereader/src/main/assembly/package.xml b/bytehousereader/src/main/assembly/package.xml new file mode 100644 index 0000000000..9f4a894c74 --- /dev/null +++ b/bytehousereader/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/reader/bytehousereader + + + target/ + + bytehousereader-0.0.1-SNAPSHOT.jar + + plugin/reader/bytehousereader + + + + + + false + plugin/reader/bytehousereader/libs + runtime + + + \ No newline at end of file diff --git a/bytehousereader/src/main/java/com/alibaba/datax/plugin/reader/bytehousereader/BytehouseReader.java b/bytehousereader/src/main/java/com/alibaba/datax/plugin/reader/bytehousereader/BytehouseReader.java new file mode 100644 index 0000000000..8dd7a5c675 --- /dev/null +++ b/bytehousereader/src/main/java/com/alibaba/datax/plugin/reader/bytehousereader/BytehouseReader.java @@ -0,0 +1,78 @@ +package com.alibaba.datax.plugin.reader.bytehousereader; + +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader; +import com.alibaba.datax.plugin.rdbms.reader.Constant; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class BytehouseReader { + + + private static final DataBaseType DATABASE_TYPE = DataBaseType.ByteHouse; + private static final Logger LOG = LoggerFactory.getLogger(BytehouseReader.class); + + public static class Job extends Reader.Job { + private Configuration jobConfig = null; + private CommonRdbmsReader.Job commonRdbmsReaderMaster; + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + int fetchSize = this.jobConfig.getInt(Constant.FETCH_SIZE, Integer.MIN_VALUE); + this.jobConfig.set(Constant.FETCH_SIZE, fetchSize); + this.commonRdbmsReaderMaster = new CommonRdbmsReader.Job(DATABASE_TYPE); + this.commonRdbmsReaderMaster.init(this.jobConfig); + } + + @Override + public List split(int mandatoryNumber) { + return this.commonRdbmsReaderMaster.split(this.jobConfig, mandatoryNumber); + } + + @Override + public void post() { + this.commonRdbmsReaderMaster.post(this.jobConfig); + } + + @Override + public void destroy() { + this.commonRdbmsReaderMaster.destroy(this.jobConfig); + } + } + + public static class Task extends Reader.Task { + + private Configuration jobConfig; + private CommonRdbmsReader.Task commonRdbmsReaderSlave; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + this.commonRdbmsReaderSlave = new CommonRdbmsReader.Task(DATABASE_TYPE, super.getTaskGroupId(), super.getTaskId()); + this.commonRdbmsReaderSlave.init(this.jobConfig); + } + + @Override + public void startRead(RecordSender recordSender) { + int fetchSize = this.jobConfig.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE, 1000); + + this.commonRdbmsReaderSlave.startRead(this.jobConfig, recordSender, super.getTaskPluginCollector(), fetchSize); + } + + @Override + public void post() { + this.commonRdbmsReaderSlave.post(this.jobConfig); + } + + @Override + public void destroy() { + this.commonRdbmsReaderSlave.destroy(this.jobConfig); + } + } + +} diff --git a/bytehousereader/src/main/resources/plugin.json b/bytehousereader/src/main/resources/plugin.json new file mode 100644 index 0000000000..d6064c11fd --- /dev/null +++ b/bytehousereader/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "bytehousereader", + "class": "com.alibaba.datax.plugin.reader.bytehousereader.BytehouseReader", + "description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql.", + "developer": "alibaba" +} \ No newline at end of file diff --git a/bytehousereader/src/main/resources/plugin_job_template.json b/bytehousereader/src/main/resources/plugin_job_template.json new file mode 100644 index 0000000000..f03a4f97b5 --- /dev/null +++ b/bytehousereader/src/main/resources/plugin_job_template.json @@ -0,0 +1,16 @@ +{ + "name": "bytehousereader", + "parameter": { + "username": "username", + "password": "password", + "column": ["col1", "col2", "col3"], + "connection": [ + { + "jdbcUrl": "jdbc:bytehouse://:[/]", + "table": ["table1", "table2"] + } + ], + "preSql": [], + "postSql": [] + } +} \ No newline at end of file diff --git a/bytehousewriter/pom.xml b/bytehousewriter/pom.xml new file mode 100644 index 0000000000..2e26040f2f --- /dev/null +++ b/bytehousewriter/pom.xml @@ -0,0 +1,12 @@ + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + bytehousewriter + Archetype - bytehousewriter + http://maven.apache.org + diff --git a/bytehousewriter/src/main/assembly/package.xml b/bytehousewriter/src/main/assembly/package.xml new file mode 100644 index 0000000000..17d5c82e6d --- /dev/null +++ b/bytehousewriter/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/bytehousewriter + + + target/ + + bytehousewriter-0.0.1-SNAPSHOT.jar + + plugin/writer/bytehousewriter + + + + + + false + plugin/writer/bytehousewriter/libs + runtime + + + diff --git a/bytehousewriter/src/main/java/com/alibaba/datax/plugin/writer/bytehousewriter/BytehouseWriter.java b/bytehousewriter/src/main/java/com/alibaba/datax/plugin/writer/bytehousewriter/BytehouseWriter.java new file mode 100644 index 0000000000..48ffec6167 --- /dev/null +++ b/bytehousewriter/src/main/java/com/alibaba/datax/plugin/writer/bytehousewriter/BytehouseWriter.java @@ -0,0 +1,324 @@ +package com.alibaba.datax.plugin.writer.bytehousewriter; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.common.exception.CommonErrorCode; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; + +import java.sql.*; +import java.util.List; +import java.util.regex.Pattern; + +public class BytehouseWriter extends Writer { + private static final DataBaseType DATABASE_TYPE = DataBaseType.ByteHouse; + + public static class Job extends Writer.Job { + private Configuration originalConfig = null; + private CommonRdbmsWriter.Job commonRdbmsWriterMaster; + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE); + this.commonRdbmsWriterMaster.init(this.originalConfig); + } + + @Override + public void prepare() { + this.commonRdbmsWriterMaster.prepare(this.originalConfig); + } + + @Override + public List split(int mandatoryNumber) { + return this.commonRdbmsWriterMaster.split(this.originalConfig, mandatoryNumber); + } + + @Override + public void post() { + this.commonRdbmsWriterMaster.post(this.originalConfig); + } + + @Override + public void destroy() { + this.commonRdbmsWriterMaster.destroy(this.originalConfig); + } + } + + public static class Task extends Writer.Task { + private Configuration writerSliceConfig; + + private CommonRdbmsWriter.Task commonRdbmsWriterSlave; + + @Override + public void init() { + this.writerSliceConfig = super.getPluginJobConf(); + + this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DATABASE_TYPE) { + @Override + protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, int columnSqltype, String typeName, Column column) throws SQLException { + try { + if (column.getRawData() == null) { + preparedStatement.setNull(columnIndex + 1, columnSqltype); + return preparedStatement; + } + + java.util.Date utilDate; + switch (columnSqltype) { + case Types.CHAR: + case Types.NCHAR: + case Types.CLOB: + case Types.NCLOB: + case Types.VARCHAR: + case Types.LONGVARCHAR: + case Types.NVARCHAR: + case Types.LONGNVARCHAR: + preparedStatement.setString(columnIndex + 1, column + .asString()); + break; + + case Types.TINYINT: + case Types.SMALLINT: + case Types.INTEGER: + case Types.BIGINT: + case Types.DECIMAL: + case Types.FLOAT: + case Types.REAL: + case Types.DOUBLE: + String strValue = column.asString(); + if (emptyAsNull && "".equals(strValue)) { + preparedStatement.setNull(columnIndex + 1, columnSqltype); + } else { + switch (columnSqltype) { + case Types.TINYINT: + case Types.SMALLINT: + case Types.INTEGER: + preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); + break; + case Types.BIGINT: + preparedStatement.setLong(columnIndex + 1, column.asLong()); + break; + case Types.DECIMAL: + preparedStatement.setBigDecimal(columnIndex + 1, column.asBigDecimal()); + break; + case Types.REAL: + case Types.FLOAT: + preparedStatement.setFloat(columnIndex + 1, column.asDouble().floatValue()); + break; + case Types.DOUBLE: + preparedStatement.setDouble(columnIndex + 1, column.asDouble()); + break; + } + } + break; + + case Types.DATE: + if (this.resultSetMetaData.getRight().get(columnIndex) + .equalsIgnoreCase("year")) { + if (column.asBigInteger() == null) { + preparedStatement.setString(columnIndex + 1, null); + } else { + preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); + } + } else { + java.sql.Date sqlDate = null; + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "Date 类型转换错误:[%s]", column)); + } + + if (null != utilDate) { + sqlDate = new java.sql.Date(utilDate.getTime()); + } + preparedStatement.setDate(columnIndex + 1, sqlDate); + } + break; + + case Types.TIME: + java.sql.Time sqlTime = null; + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "Date 类型转换错误:[%s]", column)); + } + + if (null != utilDate) { + sqlTime = new java.sql.Time(utilDate.getTime()); + } + preparedStatement.setTime(columnIndex + 1, sqlTime); + break; + + case Types.TIMESTAMP: + Timestamp sqlTimestamp = null; + if (column instanceof StringColumn && column.asString() != null) { + String timeStampStr = column.asString(); + // JAVA TIMESTAMP 类型入参必须是 "2017-07-12 14:39:00.123566" 格式 + String pattern = "^\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+"; + boolean isMatch = Pattern.matches(pattern, timeStampStr); + if (isMatch) { + sqlTimestamp = Timestamp.valueOf(timeStampStr); + preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp); + break; + } + } + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "Date 类型转换错误:[%s]", column)); + } + + if (null != utilDate) { + sqlTimestamp = new Timestamp( + utilDate.getTime()); + } + preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp); + break; + + case Types.BINARY: + case Types.VARBINARY: + case Types.BLOB: + case Types.LONGVARBINARY: + preparedStatement.setBytes(columnIndex + 1, column + .asBytes()); + break; + + case Types.BOOLEAN: + preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); + break; + + // warn: bit(1) -> Types.BIT 可使用setBoolean + // warn: bit(>1) -> Types.VARBINARY 可使用setBytes + case Types.BIT: + if (this.dataBaseType == DataBaseType.MySql) { + Boolean asBoolean = column.asBoolean(); + if (asBoolean != null) { + preparedStatement.setBoolean(columnIndex + 1, asBoolean); + } else { + preparedStatement.setNull(columnIndex + 1, Types.BIT); + } + } else { + preparedStatement.setString(columnIndex + 1, column.asString()); + } + break; + + default: + boolean isHandled = fillPreparedStatementColumnType4CustomType(preparedStatement, + columnIndex, columnSqltype, column); + if (isHandled) { + break; + } + throw DataXException + .asDataXException( + DBUtilErrorCode.UNSUPPORTED_TYPE, + String.format( + "您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.", + this.resultSetMetaData.getLeft() + .get(columnIndex), + this.resultSetMetaData.getMiddle() + .get(columnIndex), + this.resultSetMetaData.getRight() + .get(columnIndex))); + } + return preparedStatement; + } catch (DataXException e) { + // fix类型转换或者溢出失败时,将具体哪一列打印出来 + if (e.getErrorCode() == CommonErrorCode.CONVERT_NOT_SUPPORT || + e.getErrorCode() == CommonErrorCode.CONVERT_OVER_FLOW) { + throw DataXException + .asDataXException( + e.getErrorCode(), + String.format( + "类型转化错误. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.", + this.resultSetMetaData.getLeft() + .get(columnIndex), + this.resultSetMetaData.getMiddle() + .get(columnIndex), + this.resultSetMetaData.getRight() + .get(columnIndex))); + } else { + throw e; + } + } + } + + private Object toJavaArray(Object val) { + if (null == val) { + return null; + } else if (val instanceof JSONArray) { + Object[] valArray = ((JSONArray) val).toArray(); + for (int i = 0; i < valArray.length; i++) { + valArray[i] = this.toJavaArray(valArray[i]); + } + return valArray; + } else { + return val; + } + } + + boolean fillPreparedStatementColumnType4CustomType(PreparedStatement ps, + int columnIndex, int columnSqltype, + Column column) throws SQLException { + switch (columnSqltype) { + case Types.OTHER: + if (this.resultSetMetaData.getRight().get(columnIndex).startsWith("Tuple")) { + throw DataXException + .asDataXException(BytehouseWriterErrorCode.TUPLE_NOT_SUPPORTED_ERROR, BytehouseWriterErrorCode.TUPLE_NOT_SUPPORTED_ERROR.getDescription()); + } else { + ps.setString(columnIndex + 1, column.asString()); + } + return true; + + case Types.ARRAY: + Connection conn = ps.getConnection(); + List values = JSON.parseArray(column.asString(), Object.class); + for (int i = 0; i < values.size(); i++) { + values.set(i, this.toJavaArray(values.get(i))); + } + Array array = conn.createArrayOf("String", values.toArray()); + ps.setArray(columnIndex + 1, array); + return true; + + default: + break; + } + + return false; + } + }; + + this.commonRdbmsWriterSlave.init(this.writerSliceConfig); + } + + @Override + public void prepare() { + this.commonRdbmsWriterSlave.prepare(this.writerSliceConfig); + } + + @Override + public void startWrite(RecordReceiver recordReceiver) { + this.commonRdbmsWriterSlave.startWrite(recordReceiver, this.writerSliceConfig, super.getTaskPluginCollector()); + } + + @Override + public void post() { + this.commonRdbmsWriterSlave.post(this.writerSliceConfig); + } + + @Override + public void destroy() { + this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig); + } + } + +} \ No newline at end of file diff --git a/bytehousewriter/src/main/java/com/alibaba/datax/plugin/writer/bytehousewriter/BytehouseWriterErrorCode.java b/bytehousewriter/src/main/java/com/alibaba/datax/plugin/writer/bytehousewriter/BytehouseWriterErrorCode.java new file mode 100644 index 0000000000..fb27dc7a53 --- /dev/null +++ b/bytehousewriter/src/main/java/com/alibaba/datax/plugin/writer/bytehousewriter/BytehouseWriterErrorCode.java @@ -0,0 +1,31 @@ +package com.alibaba.datax.plugin.writer.bytehousewriter; + +import com.alibaba.datax.common.spi.ErrorCode; + +public enum BytehouseWriterErrorCode implements ErrorCode { + TUPLE_NOT_SUPPORTED_ERROR("clickhouseWriter-00", "不支持TUPLE类型导入."), + ; + + private final String code; + private final String description; + + private BytehouseWriterErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s].", this.code, this.description); + } +} diff --git a/bytehousewriter/src/main/resources/plugin.json b/bytehousewriter/src/main/resources/plugin.json new file mode 100644 index 0000000000..06d3f53f25 --- /dev/null +++ b/bytehousewriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "bytehousewriter", + "class": "com.alibaba.datax.plugin.writer.bytehousewriter.BytehouseWriter", + "description": "useScene: prod. mechanism: Jdbc connection using the database, execute insert sql.", + "developer": "penggan(1466927252@qq.com)" +} \ No newline at end of file diff --git a/bytehousewriter/src/main/resources/plugin_job_template.json b/bytehousewriter/src/main/resources/plugin_job_template.json new file mode 100644 index 0000000000..2e1ceed022 --- /dev/null +++ b/bytehousewriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,21 @@ +{ + "name": "clickhousewriter", + "parameter": { + "username": "username", + "password": "password", + "column": ["col1", "col2", "col3"], + "connection": [ + { + "jdbcUrl": "jdbc:clickhouse://:[/]", + "table": ["table1", "table2"] + } + ], + "preSql": [], + "postSql": [], + + "batchSize": 65536, + "batchByteSize": 134217728, + "dryRun": false, + "writeMode": "insert" + } +} \ No newline at end of file From ed1facf9905190f8797e023fb8abb0199551b965 Mon Sep 17 00:00:00 2001 From: penggan Date: Wed, 27 Nov 2024 18:21:19 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BA=86plugin=5Fjob=5Ft?= =?UTF-8?q?emplate.json=20=E6=A8=A1=E6=9D=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bytehousereader/pom.xml | 91 ++++++++++++++++++- bytehousewriter/pom.xml | 90 +++++++++++++++++- .../bytehousewriter/BytehouseWriter.java | 8 +- .../main/resources/plugin_job_template.json | 2 +- package.xml | 14 +++ pom.xml | 8 ++ 6 files changed, 202 insertions(+), 11 deletions(-) diff --git a/bytehousereader/pom.xml b/bytehousereader/pom.xml index 8aad5ca533..b77c126bb8 100644 --- a/bytehousereader/pom.xml +++ b/bytehousereader/pom.xml @@ -1,12 +1,97 @@ - 4.0.0 com.alibaba.datax datax-all 0.0.1-SNAPSHOT + 4.0.0 bytehousereader - Archetype - bytehousereader - http://maven.apache.org + bytehousereader + jar + + 8 + 8 + 1.1.58 + + + + + + com.bytedance.bytehouse + driver-java + ${bytehouse.version} + all + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + org.slf4j + slf4j-api + + + org.projectlombok + lombok + 1.18.20 + + + ch.qos.logback + logback-classic + + + + com.alibaba.datax + plugin-rdbms-util + ${datax-project-version} + + + + + + + src/main/java + + **/*.properties + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + diff --git a/bytehousewriter/pom.xml b/bytehousewriter/pom.xml index 2e26040f2f..67b33bb126 100644 --- a/bytehousewriter/pom.xml +++ b/bytehousewriter/pom.xml @@ -1,12 +1,96 @@ - 4.0.0 com.alibaba.datax datax-all 0.0.1-SNAPSHOT + 4.0.0 + jar bytehousewriter - Archetype - bytehousewriter - http://maven.apache.org + bytehousewriter + + 8 + 8 + 1.1.58 + + + + + com.bytedance.bytehouse + driver-java + ${bytehouse.version} + all + + + org.projectlombok + lombok + 1.18.20 + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + org.slf4j + slf4j-api + + + + ch.qos.logback + logback-classic + + + + com.alibaba.datax + plugin-rdbms-util + ${datax-project-version} + + + + + + src/main/java + + **/*.properties + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + diff --git a/bytehousewriter/src/main/java/com/alibaba/datax/plugin/writer/bytehousewriter/BytehouseWriter.java b/bytehousewriter/src/main/java/com/alibaba/datax/plugin/writer/bytehousewriter/BytehouseWriter.java index 48ffec6167..2ef8227cd7 100644 --- a/bytehousewriter/src/main/java/com/alibaba/datax/plugin/writer/bytehousewriter/BytehouseWriter.java +++ b/bytehousewriter/src/main/java/com/alibaba/datax/plugin/writer/bytehousewriter/BytehouseWriter.java @@ -128,7 +128,7 @@ protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement pr preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); } } else { - java.sql.Date sqlDate = null; + Date sqlDate = null; try { utilDate = column.asDate(); } catch (DataXException e) { @@ -137,14 +137,14 @@ protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement pr } if (null != utilDate) { - sqlDate = new java.sql.Date(utilDate.getTime()); + sqlDate = new Date(utilDate.getTime()); } preparedStatement.setDate(columnIndex + 1, sqlDate); } break; case Types.TIME: - java.sql.Time sqlTime = null; + Time sqlTime = null; try { utilDate = column.asDate(); } catch (DataXException e) { @@ -153,7 +153,7 @@ protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement pr } if (null != utilDate) { - sqlTime = new java.sql.Time(utilDate.getTime()); + sqlTime = new Time(utilDate.getTime()); } preparedStatement.setTime(columnIndex + 1, sqlTime); break; diff --git a/bytehousewriter/src/main/resources/plugin_job_template.json b/bytehousewriter/src/main/resources/plugin_job_template.json index 2e1ceed022..a61953371d 100644 --- a/bytehousewriter/src/main/resources/plugin_job_template.json +++ b/bytehousewriter/src/main/resources/plugin_job_template.json @@ -6,7 +6,7 @@ "column": ["col1", "col2", "col3"], "connection": [ { - "jdbcUrl": "jdbc:clickhouse://:[/]", + "jdbcUrl": "jdbc:bytehouse://:[/]", "table": ["table1", "table2"] } ], diff --git a/package.xml b/package.xml index 624109f799..e02a90f94e 100644 --- a/package.xml +++ b/package.xml @@ -25,6 +25,13 @@ + + bytehousereader/target/datax/ + + **/*.* + + datax + mysqlreader/target/datax/ @@ -259,6 +266,13 @@ + + bytehousewriter/target/datax/ + + **/*.* + + datax + mysqlwriter/target/datax/ diff --git a/pom.xml b/pom.xml index c7f43f1725..20d68c0c4a 100644 --- a/pom.xml +++ b/pom.xml @@ -135,6 +135,8 @@ gaussdbreader gaussdbwriter datax-example + bytehousewriter + bytehousereader @@ -248,6 +250,12 @@ true + + + bytedance + ByteDance Public Repository + https://artifact.bytedance.com/repository/releases +