diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/README.md b/java/FlinkCDC/FlinkCDCSQLServerSource/README.md new file mode 100644 index 0000000..b165832 --- /dev/null +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/README.md @@ -0,0 +1,150 @@ +# FlinkCDC SQL Server source example + +This example shows how to capture data from a database (SQL Server in this case) directly from Flink using a Flink CDC source connector. + +* Flink version: 1.20 +* Flink API: SQL +* Language: Java (11) +* Flink connectors: Flink CDC SQL Server source (3.4), JDBC sink + +The job is implemented in SQL embedded in Java. +It uses the [Flink CDC SQL Server source connector](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/sqlserver-cdc/) +to capture changes from a database. +Changes are propagated to the destination database using [JDBC Sink connector](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/table/jdbc/). + +### Source database + +This example uses Ms SQL Server as source database. To use a different database as CDC source you need to switch to a different Flink CDC Source connector. + +Different Flink CDC Sources require different configurations and support different metadata fields. To switch the source to a different database you need to modify the code. + +See [Flink CDC Sources documentation](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/sqlserver-cdc) for further details. + + +### Destination database + +Note that the JDBC sink is agnostic to the actual destination database technology. +This example is tested with both MySQL and PostgreSQL but can be easily adjusted to different databases. + +The `url` property in the `JdbcSink` configuration group decides the destination database (see [Runtime configuration](#runtime-configuration), below). +The correct JDBC driver must be included in the `pom.xml`. This example includes both MySQL and PostgreSQL drivers. + +### Testing with local databases using Docker Compose + +This example can be run locally using Docker. + +A [Docker Compose file](./docker/docker-compose.yml) is provided to run local SQL Server, MySQL and PostgreSQL databases. +The local databases are initialized by creating users, databases and tables. Some initial records are also inserted into the source table. + +You can run the Flink application inside your IDE following the instructions in [Running in IntelliJ](#running-in-intellij). +The default local configuration connects to the local PostgreSQL db defined in Docker Compose. + +To start the local databases run `docker compose up -d` in the `./docker` folder. + +Use `docker compose down -v` to shut them down, also removing the data volumes. + + +### Database prerequisites + +When running on Amazon Managed Service for Apache Flink and with databases on AWS, you need to set up the databases manually, ensuring you set up all the following: + +> YYou can find the SQL scripts that set up the dockerized databases by checking out the init scripts for +> [SQL Server](docker/sqlserver-init/init.sql), [MySQL](docker/mysql-init/init.sql), +> and [PostgreSQL](docker/postgres-init/init.sql). + +1. **Source database (Ms SQL Server)** + 1. SQL Server Agent must be running + 2. Native (user/password) authentication must be enabled + 3. The login used by Flink CDC (e.g. `flink_cdc`) must be `db_owner` for the database + 4. The source database and table must match the `database.name` and `table.name` you specify in the source configuration (e.g. `SampleDataPlatform` and `Customers`) + 5. The source table must have this schema: + ```sql + CREATE TABLE [dbo].[Customers] + ( + [CustomerID] [int] IDENTITY (1,1) NOT NULL, + [FirstName] [nvarchar](40) NOT NULL, + [MiddleInitial] [nvarchar](40) NULL, + [LastName] [nvarchar](40) NOT NULL, + [mail] [varchar](50) NULL, + CONSTRAINT [CustomerPK] PRIMARY KEY CLUSTERED ([CustomerID] ASC) + ) ON [PRIMARY]; + ``` + 6. CDC must be enabled both on the source database. On Amazon RDS SQL Server use the following stored procedure: + ```sql + exec msdb.dbo.rds_cdc_enable_db 'MyDB' + ``` + On self-managed SQL server you need to call a different procedure, while in the database: + ```sql + USE MyDB; + EXEC sys.sp_cdc_enable_db; + ``` + 7. CDC must also be enabled on the table: + ```sql + EXEC sys.sp_cdc_enable_table + @source_schema = N'dbo', + @source_name = N'Customers', + @role_name = NULL, + @supports_net_changes = 0; + ``` +2. **Destination database (MySQL or PostgreSQL)** + 1. The destination database name must match the `url` configured in the JDBC sink + 2. The destination table must have the following schema + ```sql + CREATE TABLE customers ( + customer_id INT PRIMARY KEY, + first_name VARCHAR(40), + middle_initial VARCHAR(40), + last_name VARCHAR(40), + email VARCHAR(50), + _source_updated_at TIMESTAMP, + _change_processed_at TIMESTAMP + ); + ``` + 3. The destination database user must have SELECT, INSERT, UPDATE and DELETE permissions on the destination table + +### Running in IntelliJ + +You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. +Run the databases locally using Docker Compose, as described [above](#testing-with-local-databases-using-docker-compose). + +See [Running examples locally](../../running-examples-locally.md) for details about running the application in the IDE. + + +### Running on Amazon Managed Service for Apache Flink + +To run the application in Amazon Managed Service for Apache Flink make sure the application configuration has the following: +* VPC networking +* The selected Subnets can route traffic to both the source and destination databases +* The Security Group allows traffic from the application to both source and destination databases + + +### Runtime configuration + +When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*. + +When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](resources/flink-application-properties-dev.json) file located in the resources folder. + +Runtime parameters: + +| Group ID | Key | Description | +|-------------|-----------------|----------------------------------------------------------------------------------------------------------------------------| +| `CDCSource` | `hostname` | Source database DNS hostname or IP | +| `CDCSource` | `port` | Source database port (normally `1433`) | +| `CDCSource` | `username` | Source database username. The user must be `dbo_owner` of the database | +| `CDCSource` | `password` | Source database password | +| `CDCSource` | `database.name` | Source database name | +| `CDCSource` | `table.name` | Source table name. e.g. `dbo.Customers` | +| `JdbcSink` | `url` | Destination database JDBC URL. e.g. `jdbc:postgresql://localhost:5432/targetdb`. Note: the URL includes the database name. | +| `JdbcSink` | `table.name` | Destination table. e.g. `customers` | +| `JdbcSink` | `username` | Destination database user | +| `JdbcSink` | `password` | Destination database password | + +### Known limitations + +Using the SQL interface of Flink CDC Sources greatly simplifies the implementation of a passthrough application. +However, schema changes in the source table are ignored. + +## References + +* [Flink CDC SQL Server documentation](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/sqlserver-cdc) +* [Debezium SQL Server documentation](https://debezium.io/documentation/reference/1.9/connectors/sqlserver.html) \ No newline at end of file diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/docker-compose.yml b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/docker-compose.yml new file mode 100644 index 0000000..ca6abb4 --- /dev/null +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/docker-compose.yml @@ -0,0 +1,77 @@ +services: + + # Ms SQL Server + init + sqlserver: + image: mcr.microsoft.com/mssql/server:2022-latest + container_name: mssql-server-2022 + environment: + - ACCEPT_EULA=Y + - SA_PASSWORD=YourStrong@Passw0rd + - MSSQL_PID=Developer + - MSSQL_AGENT_ENABLED=true + ports: + - "1433:1433" + volumes: + - sqlserver_data:/var/opt/mssql + - ./sqlserver-init/init.sql:/tmp/init.sql + restart: unless-stopped + healthcheck: + test: ["CMD-SHELL", "/opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P YourStrong@Passw0rd -Q 'SELECT 1' -C"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s + + sqlserver-init: + image: mcr.microsoft.com/mssql/server:2022-latest + depends_on: + sqlserver: + condition: service_healthy + volumes: + - ./sqlserver-init/init.sql:/tmp/init.sql + command: > + bash -c " + echo 'Waiting for SQL Server to be ready...' && + sleep 5 && + echo 'Running initialization script...' && + /opt/mssql-tools18/bin/sqlcmd -S sqlserver -U sa -P YourStrong@Passw0rd -i /tmp/init.sql -C && + echo 'Initialization completed!' + " + + # MySQL database + mysql: + image: mysql:8.0 + container_name: mysql_db + restart: always + environment: + MYSQL_ROOT_PASSWORD: R00tpwd! + MYSQL_DATABASE: targetdb + ports: + - "3306:3306" + volumes: + - mysql_data:/var/lib/mysql + - ./mysql-init:/docker-entrypoint-initdb.d + command: --default-authentication-plugin=mysql_native_password + + # PostgreSQL database + postgres: + image: postgres:15 + container_name: postgres_db + restart: always + environment: + POSTGRES_DB: targetdb + POSTGRES_USER: flinkusr + POSTGRES_PASSWORD: PassW0rd! + ports: + - "5432:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + - ./postgres-init:/docker-entrypoint-initdb.d + +volumes: + sqlserver_data: + driver: local + mysql_data: + driver: local + postgres_data: + driver: local diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/mysql-init/init.sql b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/mysql-init/init.sql new file mode 100644 index 0000000..211e432 --- /dev/null +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/mysql-init/init.sql @@ -0,0 +1,15 @@ +CREATE USER 'flinkusr'@'%' IDENTIFIED BY 'PassW0rd!'; +GRANT SELECT, INSERT, UPDATE, DELETE, SHOW DATABASES ON *.* TO 'flinkusr'@'%'; + +FLUSH PRIVILEGES; + +-- Create customer table +CREATE TABLE customers ( + customer_id INT PRIMARY KEY, + first_name VARCHAR(40), + middle_initial VARCHAR(40), + last_name VARCHAR(40), + email VARCHAR(50), + _source_updated_at TIMESTAMP, + _change_processed_at TIMESTAMP +); diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/postgres-init/init.sql b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/postgres-init/init.sql new file mode 100644 index 0000000..05e9051 --- /dev/null +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/postgres-init/init.sql @@ -0,0 +1,10 @@ +-- Create customer table +CREATE TABLE customers ( + customer_id INT PRIMARY KEY, + first_name VARCHAR(40), + middle_initial VARCHAR(40), + last_name VARCHAR(40), + email VARCHAR(50), + _source_updated_at TIMESTAMP, + _change_processed_at TIMESTAMP +); diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/sqlserver-init/init.sql b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/sqlserver-init/init.sql new file mode 100644 index 0000000..4fb43a9 --- /dev/null +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/sqlserver-init/init.sql @@ -0,0 +1,56 @@ +-- Create SampleDataPlatform database +CREATE DATABASE SampleDataPlatform; +GO + +-- Use the SampleDataPlatform database +USE SampleDataPlatform; +GO + +-- Create login for flink_cdc +CREATE LOGIN flink_cdc WITH PASSWORD = 'FlinkCDC@123'; +GO + +-- Create user in SampleDataPlatform database +CREATE USER flink_cdc FOR LOGIN flink_cdc; +GO + +-- Grant necessary permissions for CDC operations +ALTER ROLE db_owner ADD MEMBER flink_cdc; +GO + +-- Enable CDC on the SampleDataPlatform database +USE SampleDataPlatform; +EXEC sys.sp_cdc_enable_db; +GO + +-- Create Customers table with the specified schema +CREATE TABLE [dbo].[Customers] +( + [CustomerID] [int] IDENTITY (1,1) NOT NULL, + [FirstName] [nvarchar](40) NOT NULL, + [MiddleInitial] [nvarchar](40) NULL, + [LastName] [nvarchar](40) NOT NULL, + [mail] [varchar](50) NULL, + CONSTRAINT [CustomerPK] PRIMARY KEY CLUSTERED ([CustomerID] ASC) + WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY] +) ON [PRIMARY]; +GO + +-- Enable CDC on the Customers table +EXEC sys.sp_cdc_enable_table + @source_schema = N'dbo', + @source_name = N'Customers', + @role_name = NULL, + @supports_net_changes = 0; +GO + +-- Insert some sample data +INSERT INTO [dbo].[Customers] ([FirstName], [MiddleInitial], [LastName], [mail]) +VALUES ('John', 'A', 'Doe', 'john.doe@example.com'), + ('Jane', NULL, 'Smith', 'jane.smith@example.com'), + ('Bob', 'R', 'Johnson', 'bob.johnson@example.com'), + ('Alice', 'M', 'Williams', 'alice.williams@example.com'), + ('Charlie', NULL, 'Brown', 'charlie.brown@example.com'); +GO + +PRINT 'Database initialization completed successfully!'; diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/pom.xml b/java/FlinkCDC/FlinkCDCSQLServerSource/pom.xml new file mode 100644 index 0000000..ca8f112 --- /dev/null +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/pom.xml @@ -0,0 +1,191 @@ + + + 4.0.0 + + com.amazonaws + flink-cdc-sqlserver-sql-source + 1.0 + jar + + + UTF-8 + ${project.basedir}/target + ${project.name}-${project.version} + 11 + ${target.java.version} + ${target.java.version} + 1.20.0 + 3.4.0 + 3.3.0-1.20 + 9.3.0 + 42.7.2 + 1.2.0 + 2.23.1 + + + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + org.apache.flink + flink-runtime-web + ${flink.version} + provided + + + org.apache.flink + flink-json + ${flink.version} + provided + + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + + + org.apache.flink + flink-table-planner-loader + ${flink.version} + provided + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + org.apache.flink + flink-connector-base + ${flink.version} + provided + + + org.apache.flink + flink-connector-sqlserver-cdc + ${flink.cdc.version} + + + org.apache.flink + flink-connector-jdbc + ${flink.jdbc.connector.version} + + + + + + com.mysql + mysql-connector-j + ${mysql.jdbc.driver.version} + + + org.postgresql + postgresql + ${postgresql.jdbc.driver.version} + + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + + + ${buildDirectory} + ${jar.finalName} + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.amazonaws.services.msf.FlinkCDCSqlServer2JdbcJob + + + + + + + + + \ No newline at end of file diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/java/com/amazonaws/services/msf/FlinkCDCSqlServer2JdbcJob.java b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/java/com/amazonaws/services/msf/FlinkCDCSqlServer2JdbcJob.java new file mode 100644 index 0000000..1fbc14f --- /dev/null +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/java/com/amazonaws/services/msf/FlinkCDCSqlServer2JdbcJob.java @@ -0,0 +1,157 @@ +package com.amazonaws.services.msf; + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.bridge.java.StreamStatementSet; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Properties; + +public class FlinkCDCSqlServer2JdbcJob { + private static final Logger LOG = LoggerFactory.getLogger(FlinkCDCSqlServer2JdbcJob.class); + + // Name of the local JSON resource with the application properties in the same format as they are received from the Amazon Managed Service for Apache Flink runtime + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + + private static final int DEFAULT_CDC_DB_PORT = 1433; + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + /** + * Load application properties from Amazon Managed Service for Apache Flink runtime or from a local resource, when the environment is local + */ + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (isLocal(env)) { + LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + FlinkCDCSqlServer2JdbcJob.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); + } else { + LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + public static void main(String[] args) throws Exception { + // set up the streaming execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build()); + + final Map applicationProperties = loadApplicationProperties(env); + LOG.warn("Application properties: {}", applicationProperties); + + // Enable checkpoints and set parallelism when running locally + // On Managed Flink, checkpoints and application parallelism are managed by the service and controlled by the application configuration + if (isLocal(env)) { + env.setParallelism(1); // Ms SQL Server Flink CDC is single-threaded + env.enableCheckpointing(30000); + } + + + // Create CDC source table + Properties cdcSourceProperties = applicationProperties.get("CDCSource"); + tableEnv.executeSql("CREATE TABLE Customers (" + + " CustomerID INT," + + " FirstName STRING," + + " MiddleInitial STRING," + + " LastName STRING," + + " mail STRING," + + // Some additional metadata columns for demonstration purposes + " `_change_processed_at` AS PROCTIME()," + // The time when Flink is processing this record + " `_source_updated_at` TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL," + // The time when the operation was executed on the db + " `_table_name` STRING METADATA FROM 'table_name' VIRTUAL," + // Name of the table in the source db + " `_schema_name` STRING METADATA FROM 'schema_name' VIRTUAL, " + // Name of the schema in the source db + " `_db_name` STRING METADATA FROM 'database_name' VIRTUAL," + // name of the database + " PRIMARY KEY(CustomerID) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'sqlserver-cdc'," + + " 'hostname' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("hostname"), "missing CDC source hostname") + "'," + + " 'port' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("port", Integer.toString(DEFAULT_CDC_DB_PORT)), "missing CDC source port") + "'," + + " 'username' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("username"), "missing CDC source username") + "'," + + // For simplicity, we are passing the db password as a runtime configuration unencrypted. This should be avoided in production + " 'password' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("password"), "missing CDC source password") + "'," + + " 'database-name' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("database.name"), "missing CDC source database name") + "'," + + " 'table-name' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("table.name"), "missing CDC source table name") + "'" + + ")"); + + + // Create a JDBC sink table + // Note that the definition of the table is agnostic to the actual destination database (e.g. MySQL or PostgreSQL) + Properties jdbcSinkProperties = applicationProperties.get("JdbcSink"); + tableEnv.executeSql("CREATE TABLE DestinationTable (" + + " customer_id INT," + + " first_name STRING," + + " middle_initial STRING," + + " last_name STRING," + + " email STRING," + + " _source_updated_at TIMESTAMP(3)," + + " _change_processed_at TIMESTAMP(3)," + + " PRIMARY KEY(customer_id) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'jdbc'," + + " 'url' = '" + Preconditions.checkNotNull(jdbcSinkProperties.getProperty("url"), "missing destination database JDBC URL") + "'," + + " 'table-name' = '" + Preconditions.checkNotNull(jdbcSinkProperties.getProperty("table.name"), "missing destination database table name") + "'," + + " 'username' = '" + Preconditions.checkNotNull(jdbcSinkProperties.getProperty("username"), "missing destination database username") + "'," + + " 'password' = '" + Preconditions.checkNotNull(jdbcSinkProperties.getProperty("password"), "missing destination database password") + "'" + + ")"); + + // When running locally we add a secondary sink to print the output to the console. + // When the job is running on Managed Flink any output to console is not visible and may cause overhead. + // It is recommended not to print any output to the console when running the application on Managed Flink. + if( isLocal(env)) { + tableEnv.executeSql("CREATE TABLE PrintSinkTable (" + + " CustomerID INT," + + " FirstName STRING," + + " MiddleInitial STRING," + + " LastName STRING," + + " mail STRING," + + " `_change_processed_at` TIMESTAMP_LTZ(3)," + + " `_source_updated_at` TIMESTAMP_LTZ(3)," + + " `_table_name` STRING," + + " `_schema_name` STRING," + + " `_db_name` STRING," + + " PRIMARY KEY(CustomerID) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'print'" + + ")"); + } + + // Note that we use a statement set to add the two "INSERT INTO..." statements. + // When tableEnv.executeSQL(...) is used with INSERT INTO on a job running in Application mode, like on Managed Flink, + // the first statement triggers the job execution, and any code which follows is ignored. + StreamStatementSet statementSet = tableEnv.createStatementSet(); + statementSet.addInsertSql("INSERT INTO DestinationTable (" + + "customer_id, " + + "first_name, " + + "middle_initial, " + + "last_name, " + + "email, " + + "_source_updated_at, " + + "_change_processed_at" + + ") SELECT " + + "CustomerID, " + + "FirstName, " + + "MiddleInitial, " + + "LastName, " + + "mail, " + + "`_source_updated_at`, " + + "`_change_processed_at` " + + "FROM Customers"); + if( isLocal(env)) { + statementSet.addInsertSql("INSERT INTO PrintSinkTable SELECT * FROM Customers"); + } + + + // Execute the two INSERT INTO statements + statementSet.execute(); + } +} diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/flink-application-properties-dev.json b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 0000000..8974b2f --- /dev/null +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,22 @@ +[ + { + "PropertyGroupId": "CDCSource", + "PropertyMap": { + "hostname": "localhost", + "port": "1433", + "username": "flink_cdc", + "password": "FlinkCDC@123", + "database.name": "SampleDataPlatform", + "table.name": "dbo.Customers" + } + }, + { + "PropertyGroupId": "JdbcSink", + "PropertyMap": { + "table.name": "customers", + "url": "jdbc:postgresql://localhost:5432/targetdb", + "username": "flinkusr", + "password": "PassW0rd!" + } + } +] \ No newline at end of file diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/log4j2.properties b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/log4j2.properties new file mode 100644 index 0000000..3546643 --- /dev/null +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/log4j2.properties @@ -0,0 +1,7 @@ +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/java/FlinkCDC/README.md b/java/FlinkCDC/README.md new file mode 100644 index 0000000..c714569 --- /dev/null +++ b/java/FlinkCDC/README.md @@ -0,0 +1,5 @@ +# Using Flink CDC Sources + +This folder contains examples showing using Flink CDC Sources as source connectors in Amazon Managed Service for Apache Flink + +* [Flink CDC SQL Server source (SQL)](./FlinkCDCSQLServerSource), writing to JDBC sink. \ No newline at end of file diff --git a/java/pom.xml b/java/pom.xml index 54e412e..877b596 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -42,5 +42,6 @@ SQSSink S3AvroSink S3AvroSource + FlinkCDC/FlinkCDCSQLServerSource \ No newline at end of file