From bafb1b62fa5fb0ee05d4a5cc50e1062c24f8a19d Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Wed, 25 Jun 2025 12:50:42 +0100 Subject: [PATCH 1/6] Flink CDC SQL Server example --- .../FlinkCDCSQLServerSource/README.md | 20 ++ .../docker/docker-compose.yml | 41 ++++ .../FlinkCDCSQLServerSource/docker/init.sql | 56 ++++++ java/FlinkCDC/FlinkCDCSQLServerSource/pom.xml | 175 ++++++++++++++++++ .../msf/FlinkCDCSqlServerSourceJob.java | 137 ++++++++++++++ .../flink-application-properties-dev.json | 20 ++ .../src/main/resources/log4j2.properties | 7 + java/FlinkCDC/README.md | 5 + java/pom.xml | 1 + 9 files changed, 462 insertions(+) create mode 100644 java/FlinkCDC/FlinkCDCSQLServerSource/README.md create mode 100644 java/FlinkCDC/FlinkCDCSQLServerSource/docker/docker-compose.yml create mode 100644 java/FlinkCDC/FlinkCDCSQLServerSource/docker/init.sql create mode 100644 java/FlinkCDC/FlinkCDCSQLServerSource/pom.xml create mode 100644 java/FlinkCDC/FlinkCDCSQLServerSource/src/main/java/com/amazonaws/services/msf/FlinkCDCSqlServerSourceJob.java create mode 100644 java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/flink-application-properties-dev.json create mode 100644 java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/log4j2.properties create mode 100644 java/FlinkCDC/README.md diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/README.md b/java/FlinkCDC/FlinkCDCSQLServerSource/README.md new file mode 100644 index 00000000..153ddd08 --- /dev/null +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/README.md @@ -0,0 +1,20 @@ +# FlinkCDC SQL Server source example + +This example shows how to capture data from a database (SQL Server in this case) directly from Flink using a Flink CDC source connector. + +Flink version: 1.20 +Flink API: SQL +Language: Java (11) +Flink connectors: Flink CDC SQL Server source (3.4), DynamoDB sink + +The job is implemented in SQL embedded in Java. It uses the [Flink CDC SQL Server source connector](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/sqlserver-cdc/) +to capture changes from a database, and sink data to a DynamoDB table doing upsert by primary key, so the destination table always contains the latest state of each record. + +> ⚠️ DynamoDB SQL sink does not currently support DELETE (see [FLINK-35500](https://issues.apache.org/jira/browse/FLINK-35500)) + +TBD + +## References + +* [Flink CDC SQL Server documentation](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/sqlserver-cdc) +* [Debezium SQL Server documentation](https://debezium.io/documentation/reference/1.9/connectors/sqlserver.html) \ No newline at end of file diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/docker-compose.yml b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/docker-compose.yml new file mode 100644 index 00000000..2f568035 --- /dev/null +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/docker-compose.yml @@ -0,0 +1,41 @@ +services: + mssql: + image: mcr.microsoft.com/mssql/server:2022-latest + container_name: mssql-server-2022 + environment: + - ACCEPT_EULA=Y + - SA_PASSWORD=YourStrong@Passw0rd + - MSSQL_PID=Developer + - MSSQL_AGENT_ENABLED=true + ports: + - "1433:1433" + volumes: + - mssql_data:/var/opt/mssql + - ./init.sql:/tmp/init.sql + restart: unless-stopped + healthcheck: + test: ["CMD-SHELL", "/opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P YourStrong@Passw0rd -Q 'SELECT 1' -C"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s + + mssql-init: + image: mcr.microsoft.com/mssql/server:2022-latest + depends_on: + mssql: + condition: service_healthy + volumes: + - ./init.sql:/tmp/init.sql + command: > + bash -c " + echo 'Waiting for SQL Server to be ready...' && + sleep 5 && + echo 'Running initialization script...' && + /opt/mssql-tools18/bin/sqlcmd -S mssql -U sa -P YourStrong@Passw0rd -i /tmp/init.sql -C && + echo 'Initialization completed!' + " + +volumes: + mssql_data: + driver: local diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/init.sql b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/init.sql new file mode 100644 index 00000000..95f02528 --- /dev/null +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/init.sql @@ -0,0 +1,56 @@ +-- Create SampleDataPlatform database +CREATE DATABASE SampleDataPlatform; +GO + +-- Use the SampleDataPlatform database +USE SampleDataPlatform; +GO + +-- Create login for flink_cdc +CREATE LOGIN flink_cdc WITH PASSWORD = 'FlinkCDC@123'; +GO + +-- Create user in SampleDataPlatform database +CREATE USER flink_cdc FOR LOGIN flink_cdc; +GO + +-- Grant necessary permissions for CDC operations +ALTER ROLE db_owner ADD MEMBER flink_cdc; +GO + +-- Enable CDC on the SampleDataPlatform database +USE SampleDataPlatform; +EXEC sys.sp_cdc_enable_db; +GO + +-- Create Customers table with the specified schema +CREATE TABLE [dbo].[Customers]( + [CustomerID] [int] IDENTITY(1,1) NOT NULL, + [FirstName] [nvarchar](40) NOT NULL, + [MiddleInitial] [nvarchar](40) NULL, + [LastName] [nvarchar](40) NOT NULL, + [mail] [varchar](50) NULL, + CONSTRAINT [CustomerPK] PRIMARY KEY CLUSTERED +( + [CustomerID] ASC +)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY] +) ON [PRIMARY]; +GO + +-- Enable CDC on the Customers table +EXEC sys.sp_cdc_enable_table + @source_schema = N'dbo', + @source_name = N'Customers', + @role_name = NULL; +GO + +-- Insert some sample data +INSERT INTO [dbo].[Customers] ([FirstName], [MiddleInitial], [LastName], [mail]) VALUES + ('John', 'A', 'Doe', 'john.doe@example.com'), + ('Jane', NULL, 'Smith', 'jane.smith@example.com'), + ('Bob', 'R', 'Johnson', 'bob.johnson@example.com'), + ('Alice', 'M', 'Williams', 'alice.williams@example.com'), + ('Charlie', NULL, 'Brown', 'charlie.brown@example.com'); +GO + +PRINT 'Database initialization completed successfully!'; diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/pom.xml b/java/FlinkCDC/FlinkCDCSQLServerSource/pom.xml new file mode 100644 index 00000000..ba5c0393 --- /dev/null +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/pom.xml @@ -0,0 +1,175 @@ + + + 4.0.0 + + com.amazonaws + flink-cdc-sqlserver-sql-source + 1.0 + jar + + + UTF-8 + ${project.basedir}/target + ${project.name}-${project.version} + 11 + ${target.java.version} + ${target.java.version} + 1.20.0 + 3.4.0 + 5.0.0-1.20 + 1.2.0 + 2.23.1 + + + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + org.apache.flink + flink-runtime-web + ${flink.version} + provided + + + org.apache.flink + flink-json + ${flink.version} + provided + + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + + + org.apache.flink + flink-table-planner-loader + ${flink.version} + provided + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + org.apache.flink + flink-connector-base + ${flink.version} + provided + + + org.apache.flink + flink-connector-sqlserver-cdc + ${flink.cdc.version} + + + org.apache.flink + flink-connector-dynamodb + ${aws.connectors.version} + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + + + ${buildDirectory} + ${jar.finalName} + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.amazonaws.services.msf.FlinkCDCMySQLSourceJob + + + + + + + + + \ No newline at end of file diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/java/com/amazonaws/services/msf/FlinkCDCSqlServerSourceJob.java b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/java/com/amazonaws/services/msf/FlinkCDCSqlServerSourceJob.java new file mode 100644 index 00000000..82a61696 --- /dev/null +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/java/com/amazonaws/services/msf/FlinkCDCSqlServerSourceJob.java @@ -0,0 +1,137 @@ +package com.amazonaws.services.msf; + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.bridge.java.StreamStatementSet; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Properties; + +public class FlinkCDCSqlServerSourceJob { + private static final Logger LOG = LoggerFactory.getLogger(FlinkCDCSqlServerSourceJob.class); + + // Name of the local JSON resource with the application properties in the same format as they are received from the Amazon Managed Service for Apache Flink runtime + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + + private static final int DEFAULT_CDC_DB_PORT = 1433; + private static final String DEFAULT_CDC_DATABASE_NAME = "SampleDataPlatform"; + private static final String DEFAULT_CDC_TABLE_NAME = "dbo.Customers"; + private static final String DEFAULT_DDB_TABLE_NAME = "Customers"; + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + /** + * Load application properties from Amazon Managed Service for Apache Flink runtime or from a local resource, when the environment is local + */ + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (isLocal(env)) { + LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + FlinkCDCSqlServerSourceJob.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); + } else { + LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + public static void main(String[] args) throws Exception { + // set up the streaming execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build()); + + final Map applicationProperties = loadApplicationProperties(env); + LOG.warn("Application properties: {}", applicationProperties); + + // Enable checkpoints and set parallelism when running locally + // On Managed Flink, checkpoints and application parallelism are managed by the service and controlled by the application configuration + if (isLocal(env)) { + env.setParallelism(1); // Ms SQL Server Flink CDC is single-threaded + env.enableCheckpointing(30000); + } + + + // Create CDC source table + Properties cdcSourceProperties = applicationProperties.get("CDCSource"); + tableEnv.executeSql("CREATE TABLE Customers (" + + " CustomerID INT," + + " FirstName STRING," + + " MiddleInitial STRING," + + " LastName STRING," + + " mail STRING," + + // Some additional metadata columns for demonstration purposes + " `_ingestion_ts` AS PROCTIME()," + // The time when Flink is processing this record + " `_operation_ts` TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL," + // The time when the operation was executed on the db + " `_table_name` STRING METADATA FROM 'table_name' VIRTUAL," + // Name of the table in the source db + " `_schema_name` STRING METADATA FROM 'schema_name' VIRTUAL, " + // Name of the schema in the source db + " `_db_name` STRING METADATA FROM 'database_name' VIRTUAL," + // name of the database + " PRIMARY KEY(CustomerID) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'sqlserver-cdc'," + + " 'hostname' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("hostname"), "missing CDC source hostname") + "'," + + " 'port' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("port", Integer.toString(DEFAULT_CDC_DB_PORT)), "missing CDC source port") + "'," + + " 'username' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("username"), "missing CDC source username") + "'," + + // For simplicity, we are passing the db password as a runtime configuration unencrypted. This should be avoided in production + " 'password' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("password"), "missing CDC source password") + "'," + + " 'database-name' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("database.name", DEFAULT_CDC_DATABASE_NAME), "missing CDC source database name") + "'," + + " 'table-name' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("table.name", DEFAULT_CDC_TABLE_NAME), "missing CDC source table name") + "'" + + ")"); + + + // Create a DynamoDB sink table + Properties dynamoDBProperties = applicationProperties.get("DynamoDBSink"); + tableEnv.executeSql("CREATE TABLE DDBSinkTable (" + + " CustomerID INT," + + " FirstName STRING," + + " MiddleInitial STRING," + + " LastName STRING," + + " mail STRING," + + " `_ingestion_ts` TIMESTAMP_LTZ(3)," + + " `_operation_ts` TIMESTAMP_LTZ(3)," + + " `_table_name` STRING," + + " `_schema_name` STRING," + + " `_db_name` STRING" + + // " PRIMARY KEY(CustomerID) NOT ENFORCED" + + ") PARTITIONED BY (`CustomerID`) " + + "WITH (" + + " 'connector' = 'dynamodb'," + + " 'table-name' = '" + Preconditions.checkNotNull(dynamoDBProperties.getProperty("table.name", DEFAULT_DDB_TABLE_NAME), "missing DynamoDB table name") + "'," + + " 'aws.region' = '" + Preconditions.checkNotNull(dynamoDBProperties.getProperty("aws.region"), "missing AWS region") + "'" + + ")"); + + // While developing locally, you can comment the sink table to DynamoDB and uncomment the following table to print records to the console + // When the job is running on Managed Flink any output to console is not visible + tableEnv.executeSql("CREATE TABLE PrintSinkTable (" + + " CustomerID INT," + + " FirstName STRING," + + " MiddleInitial STRING," + + " LastName STRING," + + " mail STRING," + + " `_ingestion_ts` TIMESTAMP_LTZ(3)," + + " `_operation_ts` TIMESTAMP_LTZ(3)," + + " `_table_name` STRING," + + " `_schema_name` STRING," + + " `_db_name` STRING," + + " PRIMARY KEY(CustomerID) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'print'" + + ")"); + + + StreamStatementSet statementSet = tableEnv.createStatementSet(); + statementSet.addInsertSql("INSERT INTO DDBSinkTable SELECT * FROM Customers"); + statementSet.addInsertSql("INSERT INTO PrintSinkTable SELECT * FROM Customers"); + + // Execute the two INSERT INTO statements + statementSet.execute(); + } +} diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/flink-application-properties-dev.json b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 00000000..d7cdbdf0 --- /dev/null +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,20 @@ +[ + { + "PropertyGroupId": "CDCSource", + "PropertyMap": { + "hostname": "localhost", + "port": "1433", + "username": "flink_cdc", + "password": "FlinkCDC@123", + "database.name": "SampleDataPlatform", + "table.name": "dbo.Customers" + } + }, + { + "PropertyGroupId": "DynamoDBSink", + "PropertyMap": { + "table.name": "Customers", + "aws.region": "us-east-1" + } + } +] \ No newline at end of file diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/log4j2.properties b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/log4j2.properties new file mode 100644 index 00000000..35466433 --- /dev/null +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/log4j2.properties @@ -0,0 +1,7 @@ +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/java/FlinkCDC/README.md b/java/FlinkCDC/README.md new file mode 100644 index 00000000..6f89e60f --- /dev/null +++ b/java/FlinkCDC/README.md @@ -0,0 +1,5 @@ +# Using Flink CDC Sources + +This folder contains examples showing using Flink CDC Sources as source connectors in Amazon Managed Service for Apache Flink + +* [Flink CDC SQL Server source (SQL)](./FlinkCDCSQLServerSource) \ No newline at end of file diff --git a/java/pom.xml b/java/pom.xml index 54e412e7..877b5965 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -42,5 +42,6 @@ SQSSink S3AvroSink S3AvroSource + FlinkCDC/FlinkCDCSQLServerSource \ No newline at end of file From e67a6682e8ef628150cd2e15d4e1c3c1e5c1927e Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Wed, 25 Jun 2025 15:36:32 +0100 Subject: [PATCH 2/6] Fix POM and init script --- java/FlinkCDC/FlinkCDCSQLServerSource/README.md | 17 +++++++++++++++++ .../FlinkCDCSQLServerSource/docker/init.sql | 3 ++- java/FlinkCDC/FlinkCDCSQLServerSource/pom.xml | 2 +- 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/README.md b/java/FlinkCDC/FlinkCDCSQLServerSource/README.md index 153ddd08..6566bf62 100644 --- a/java/FlinkCDC/FlinkCDCSQLServerSource/README.md +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/README.md @@ -14,6 +14,23 @@ to capture changes from a database, and sink data to a DynamoDB table doing upse TBD +### Requirements + +1. SQL Server Agent must be running +2. The user passed to FlinCDC must be `db_owner` for the database +3. CDC must be enabled both on the database AND on the table + ```sql + USE MyDB; + EXEC sys.sp_cdc_enable_db; + + EXEC sys.sp_cdc_enable_table + @source_schema = N'dbo', + @source_name = N'Customers', + @role_name = NULL, + @supports_net_changes = 0; + ``` + + ## References * [Flink CDC SQL Server documentation](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/sqlserver-cdc) diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/init.sql b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/init.sql index 95f02528..8e283f1d 100644 --- a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/init.sql +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/init.sql @@ -41,7 +41,8 @@ GO EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'Customers', - @role_name = NULL; + @role_name = NULL, + @supports_net_changes = 0; GO -- Insert some sample data diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/pom.xml b/java/FlinkCDC/FlinkCDCSQLServerSource/pom.xml index ba5c0393..641ef0b3 100644 --- a/java/FlinkCDC/FlinkCDCSQLServerSource/pom.xml +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/pom.xml @@ -163,7 +163,7 @@ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> - com.amazonaws.services.msf.FlinkCDCMySQLSourceJob + com.amazonaws.services.msf.FlinkCDCSqlServerSourceJob From c2ecd9f63ad9a119b259ed26702f2158ba7a70fe Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Wed, 25 Jun 2025 15:45:34 +0100 Subject: [PATCH 3/6] Rename init script --- .../docker/docker-compose.yml | 6 +- .../FlinkCDCSQLServerSource/docker/init.sql | 57 ------------------- .../docker/sqlserver-init.sql | 56 ++++++++++++++++++ 3 files changed, 59 insertions(+), 60 deletions(-) delete mode 100644 java/FlinkCDC/FlinkCDCSQLServerSource/docker/init.sql create mode 100644 java/FlinkCDC/FlinkCDCSQLServerSource/docker/sqlserver-init.sql diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/docker-compose.yml b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/docker-compose.yml index 2f568035..1a5abc6a 100644 --- a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/docker-compose.yml +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/docker-compose.yml @@ -11,7 +11,7 @@ services: - "1433:1433" volumes: - mssql_data:/var/opt/mssql - - ./init.sql:/tmp/init.sql + - ./sqlserver-init.sql:/tmp/sqlserver-init.sql restart: unless-stopped healthcheck: test: ["CMD-SHELL", "/opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P YourStrong@Passw0rd -Q 'SELECT 1' -C"] @@ -26,13 +26,13 @@ services: mssql: condition: service_healthy volumes: - - ./init.sql:/tmp/init.sql + - ./sqlserver-init.sql:/tmp/sqlserver-init.sql command: > bash -c " echo 'Waiting for SQL Server to be ready...' && sleep 5 && echo 'Running initialization script...' && - /opt/mssql-tools18/bin/sqlcmd -S mssql -U sa -P YourStrong@Passw0rd -i /tmp/init.sql -C && + /opt/mssql-tools18/bin/sqlcmd -S mssql -U sa -P YourStrong@Passw0rd -i /tmp/sqlserver-init.sql -C && echo 'Initialization completed!' " diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/init.sql b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/init.sql deleted file mode 100644 index 8e283f1d..00000000 --- a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/init.sql +++ /dev/null @@ -1,57 +0,0 @@ --- Create SampleDataPlatform database -CREATE DATABASE SampleDataPlatform; -GO - --- Use the SampleDataPlatform database -USE SampleDataPlatform; -GO - --- Create login for flink_cdc -CREATE LOGIN flink_cdc WITH PASSWORD = 'FlinkCDC@123'; -GO - --- Create user in SampleDataPlatform database -CREATE USER flink_cdc FOR LOGIN flink_cdc; -GO - --- Grant necessary permissions for CDC operations -ALTER ROLE db_owner ADD MEMBER flink_cdc; -GO - --- Enable CDC on the SampleDataPlatform database -USE SampleDataPlatform; -EXEC sys.sp_cdc_enable_db; -GO - --- Create Customers table with the specified schema -CREATE TABLE [dbo].[Customers]( - [CustomerID] [int] IDENTITY(1,1) NOT NULL, - [FirstName] [nvarchar](40) NOT NULL, - [MiddleInitial] [nvarchar](40) NULL, - [LastName] [nvarchar](40) NOT NULL, - [mail] [varchar](50) NULL, - CONSTRAINT [CustomerPK] PRIMARY KEY CLUSTERED -( - [CustomerID] ASC -)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY] -) ON [PRIMARY]; -GO - --- Enable CDC on the Customers table -EXEC sys.sp_cdc_enable_table - @source_schema = N'dbo', - @source_name = N'Customers', - @role_name = NULL, - @supports_net_changes = 0; -GO - --- Insert some sample data -INSERT INTO [dbo].[Customers] ([FirstName], [MiddleInitial], [LastName], [mail]) VALUES - ('John', 'A', 'Doe', 'john.doe@example.com'), - ('Jane', NULL, 'Smith', 'jane.smith@example.com'), - ('Bob', 'R', 'Johnson', 'bob.johnson@example.com'), - ('Alice', 'M', 'Williams', 'alice.williams@example.com'), - ('Charlie', NULL, 'Brown', 'charlie.brown@example.com'); -GO - -PRINT 'Database initialization completed successfully!'; diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/sqlserver-init.sql b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/sqlserver-init.sql new file mode 100644 index 00000000..4fb43a98 --- /dev/null +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/sqlserver-init.sql @@ -0,0 +1,56 @@ +-- Create SampleDataPlatform database +CREATE DATABASE SampleDataPlatform; +GO + +-- Use the SampleDataPlatform database +USE SampleDataPlatform; +GO + +-- Create login for flink_cdc +CREATE LOGIN flink_cdc WITH PASSWORD = 'FlinkCDC@123'; +GO + +-- Create user in SampleDataPlatform database +CREATE USER flink_cdc FOR LOGIN flink_cdc; +GO + +-- Grant necessary permissions for CDC operations +ALTER ROLE db_owner ADD MEMBER flink_cdc; +GO + +-- Enable CDC on the SampleDataPlatform database +USE SampleDataPlatform; +EXEC sys.sp_cdc_enable_db; +GO + +-- Create Customers table with the specified schema +CREATE TABLE [dbo].[Customers] +( + [CustomerID] [int] IDENTITY (1,1) NOT NULL, + [FirstName] [nvarchar](40) NOT NULL, + [MiddleInitial] [nvarchar](40) NULL, + [LastName] [nvarchar](40) NOT NULL, + [mail] [varchar](50) NULL, + CONSTRAINT [CustomerPK] PRIMARY KEY CLUSTERED ([CustomerID] ASC) + WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY] +) ON [PRIMARY]; +GO + +-- Enable CDC on the Customers table +EXEC sys.sp_cdc_enable_table + @source_schema = N'dbo', + @source_name = N'Customers', + @role_name = NULL, + @supports_net_changes = 0; +GO + +-- Insert some sample data +INSERT INTO [dbo].[Customers] ([FirstName], [MiddleInitial], [LastName], [mail]) +VALUES ('John', 'A', 'Doe', 'john.doe@example.com'), + ('Jane', NULL, 'Smith', 'jane.smith@example.com'), + ('Bob', 'R', 'Johnson', 'bob.johnson@example.com'), + ('Alice', 'M', 'Williams', 'alice.williams@example.com'), + ('Charlie', NULL, 'Brown', 'charlie.brown@example.com'); +GO + +PRINT 'Database initialization completed successfully!'; From f20529a57ac3e4820e8659a74bbfcd63637a6723 Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Thu, 26 Jun 2025 11:38:12 +0100 Subject: [PATCH 4/6] Converted Flink CDC SQL Server example to write to MySQL or PostgreSQL --- .../FlinkCDCSQLServerSource/README.md | 114 +++++++++++++++--- .../docker/docker-compose.yml | 52 ++++++-- .../docker/mysql-init/init.sql | 15 +++ .../docker/postgres-init/init.sql | 10 ++ .../init.sql} | 0 java/FlinkCDC/FlinkCDCSQLServerSource/pom.xml | 24 +++- ...ob.java => FlinkCDCSqlServer2JdbcJob.java} | 113 +++++++++-------- .../flink-application-properties-dev.json | 8 +- java/FlinkCDC/README.md | 2 +- 9 files changed, 257 insertions(+), 81 deletions(-) create mode 100644 java/FlinkCDC/FlinkCDCSQLServerSource/docker/mysql-init/init.sql create mode 100644 java/FlinkCDC/FlinkCDCSQLServerSource/docker/postgres-init/init.sql rename java/FlinkCDC/FlinkCDCSQLServerSource/docker/{sqlserver-init.sql => sqlserver-init/init.sql} (100%) rename java/FlinkCDC/FlinkCDCSQLServerSource/src/main/java/com/amazonaws/services/msf/{FlinkCDCSqlServerSourceJob.java => FlinkCDCSqlServer2JdbcJob.java} (58%) diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/README.md b/java/FlinkCDC/FlinkCDCSQLServerSource/README.md index 6566bf62..6bf47cd3 100644 --- a/java/FlinkCDC/FlinkCDCSQLServerSource/README.md +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/README.md @@ -5,30 +5,110 @@ This example shows how to capture data from a database (SQL Server in this case) Flink version: 1.20 Flink API: SQL Language: Java (11) -Flink connectors: Flink CDC SQL Server source (3.4), DynamoDB sink +Flink connectors: Flink CDC SQL Server source (3.4), JDBC sink The job is implemented in SQL embedded in Java. It uses the [Flink CDC SQL Server source connector](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/sqlserver-cdc/) -to capture changes from a database, and sink data to a DynamoDB table doing upsert by primary key, so the destination table always contains the latest state of each record. +to capture changes from a database, and propagates the changes to a different database using [JDBC Sink connector](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/table/jdbc/). -> ⚠️ DynamoDB SQL sink does not currently support DELETE (see [FLINK-35500](https://issues.apache.org/jira/browse/FLINK-35500)) +### Source database -TBD +This example uses Ms SQL Server as source database. To use a different database ad CDC source you need to use a different Flink CDC Source connector. + +Different Flink CDC Sources require different configurations and support different metadata fields. To switch the source to a different database you need to modify the code. + +See [Flink CDC Sources documentation](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/sqlserver-cdc) for further details. + + +### Destination database + +Note that the JDBC sink is agnostic to the actual destination database technology. This example is tested with both MySQL +and PostgreSQL but can be easily adjusted to different databases. + +The `JdbcSink` `url` decides the destination database (see [Runtime configuration](#runtime-configuration), below). +The correct JDBC driver must be included in the `pom.xml`. This example includes both MySQL and PostgreSQL drivers. + +### Testing locally using Docker Compose + +This example can be run locally using Docker. + +A [Docker Compose file](./docker/docker-compose.yml) is provided to run local SQL Server, MySQL and PostgreSQL databases. +The local databases are initialized creating users, databases and tables. Some initial data are also inserted into the source table. + +You can run the Flink application inside your IDE following the instructions in [Running in IntelliJ](#running-in-intellij). +The default local configuration connects to the local PostgreSQL db defined in Docker Compose. + +To start the local databases run `docker compose up -d` in the `./docker` folder. + +Use `docker compose down -v` to shut them down, also removing the data volumes. -### Requirements -1. SQL Server Agent must be running -2. The user passed to FlinCDC must be `db_owner` for the database -3. CDC must be enabled both on the database AND on the table - ```sql - USE MyDB; - EXEC sys.sp_cdc_enable_db; +### Database prerequisites + +When running on Amazon Managed Service for Apache Flink and with databases on AWS, you need to set up the databases manually, ensuring you set up all the following: + +> You can find the SQL scripts which set up the dockerized databases checking out the init scripts for +> [SQL Server](docker/sqlserver-init/init.sql), [MySQL](docker/mysql-init/init.sql), +> and [PostgreSQL](docker/postgres-init/init.sql). + +1. **Source database (Ms SQL Server)** + 1. SQL Server Agent must be running + 2. Native (user/password) authentication must be enabled + 3. The login used by FlinCDC (e.g. `flink_cdc`) must be `db_owner` for the database + 4. The source database and table must match the `database.name`and `table.name` you specify in the source configuration (e.g. `SampleDataPlatform` and `Customers`) + 5. The source table with must have this schema: + ```sql + CREATE TABLE [dbo].[Customers] + ( + [CustomerID] [int] IDENTITY (1,1) NOT NULL, + [FirstName] [nvarchar](40) NOT NULL, + [MiddleInitial] [nvarchar](40) NULL, + [LastName] [nvarchar](40) NOT NULL, + [mail] [varchar](50) NULL, + CONSTRAINT [CustomerPK] PRIMARY KEY CLUSTERED ([CustomerID] ASC) + ) ON [PRIMARY]; + ``` + 6. CDC must be enabled both on the source database AND on the table + ```sql + USE MyDB; + EXEC sys.sp_cdc_enable_db; - EXEC sys.sp_cdc_enable_table - @source_schema = N'dbo', - @source_name = N'Customers', - @role_name = NULL, - @supports_net_changes = 0; - ``` + EXEC sys.sp_cdc_enable_table + @source_schema = N'dbo', + @source_name = N'Customers', + @role_name = NULL, + @supports_net_changes = 0; + ``` +2. **Destination database (MySQL or PostgreSQL)** + 1. The destination database name must match the `url` configured in the JDBC sink + 2. The destination table must have the following schema + ```sql + CREATE TABLE customers ( + customer_id INT PRIMARY KEY, + first_name VARCHAR(40), + middle_initial VARCHAR(40), + last_name VARCHAR(40), + email VARCHAR(50), + _source_updated_at TIMESTAMP, + _change_processed_at TIMESTAMP + ); + ``` + 3. The destination database user must have SELECT, INSERT, UPDATE and DELETE permissions on the destination table + +### Running in IntelliJ + +You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. + +See [Running examples locally](../../running-examples-locally.md) for details. + + +### Running on Amazon Managed Service for Apache Flink + +TBD +(set up VPC connectivity) + +### Runtime configuration + +TBD ## References diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/docker-compose.yml b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/docker-compose.yml index 1a5abc6a..ca6abb46 100644 --- a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/docker-compose.yml +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/docker-compose.yml @@ -1,5 +1,7 @@ services: - mssql: + + # Ms SQL Server + init + sqlserver: image: mcr.microsoft.com/mssql/server:2022-latest container_name: mssql-server-2022 environment: @@ -10,8 +12,8 @@ services: ports: - "1433:1433" volumes: - - mssql_data:/var/opt/mssql - - ./sqlserver-init.sql:/tmp/sqlserver-init.sql + - sqlserver_data:/var/opt/mssql + - ./sqlserver-init/init.sql:/tmp/init.sql restart: unless-stopped healthcheck: test: ["CMD-SHELL", "/opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P YourStrong@Passw0rd -Q 'SELECT 1' -C"] @@ -20,22 +22,56 @@ services: retries: 5 start_period: 10s - mssql-init: + sqlserver-init: image: mcr.microsoft.com/mssql/server:2022-latest depends_on: - mssql: + sqlserver: condition: service_healthy volumes: - - ./sqlserver-init.sql:/tmp/sqlserver-init.sql + - ./sqlserver-init/init.sql:/tmp/init.sql command: > bash -c " echo 'Waiting for SQL Server to be ready...' && sleep 5 && echo 'Running initialization script...' && - /opt/mssql-tools18/bin/sqlcmd -S mssql -U sa -P YourStrong@Passw0rd -i /tmp/sqlserver-init.sql -C && + /opt/mssql-tools18/bin/sqlcmd -S sqlserver -U sa -P YourStrong@Passw0rd -i /tmp/init.sql -C && echo 'Initialization completed!' " + # MySQL database + mysql: + image: mysql:8.0 + container_name: mysql_db + restart: always + environment: + MYSQL_ROOT_PASSWORD: R00tpwd! + MYSQL_DATABASE: targetdb + ports: + - "3306:3306" + volumes: + - mysql_data:/var/lib/mysql + - ./mysql-init:/docker-entrypoint-initdb.d + command: --default-authentication-plugin=mysql_native_password + + # PostgreSQL database + postgres: + image: postgres:15 + container_name: postgres_db + restart: always + environment: + POSTGRES_DB: targetdb + POSTGRES_USER: flinkusr + POSTGRES_PASSWORD: PassW0rd! + ports: + - "5432:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + - ./postgres-init:/docker-entrypoint-initdb.d + volumes: - mssql_data: + sqlserver_data: + driver: local + mysql_data: + driver: local + postgres_data: driver: local diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/mysql-init/init.sql b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/mysql-init/init.sql new file mode 100644 index 00000000..211e432d --- /dev/null +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/mysql-init/init.sql @@ -0,0 +1,15 @@ +CREATE USER 'flinkusr'@'%' IDENTIFIED BY 'PassW0rd!'; +GRANT SELECT, INSERT, UPDATE, DELETE, SHOW DATABASES ON *.* TO 'flinkusr'@'%'; + +FLUSH PRIVILEGES; + +-- Create customer table +CREATE TABLE customers ( + customer_id INT PRIMARY KEY, + first_name VARCHAR(40), + middle_initial VARCHAR(40), + last_name VARCHAR(40), + email VARCHAR(50), + _source_updated_at TIMESTAMP, + _change_processed_at TIMESTAMP +); diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/postgres-init/init.sql b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/postgres-init/init.sql new file mode 100644 index 00000000..05e90511 --- /dev/null +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/postgres-init/init.sql @@ -0,0 +1,10 @@ +-- Create customer table +CREATE TABLE customers ( + customer_id INT PRIMARY KEY, + first_name VARCHAR(40), + middle_initial VARCHAR(40), + last_name VARCHAR(40), + email VARCHAR(50), + _source_updated_at TIMESTAMP, + _change_processed_at TIMESTAMP +); diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/sqlserver-init.sql b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/sqlserver-init/init.sql similarity index 100% rename from java/FlinkCDC/FlinkCDCSQLServerSource/docker/sqlserver-init.sql rename to java/FlinkCDC/FlinkCDCSQLServerSource/docker/sqlserver-init/init.sql diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/pom.xml b/java/FlinkCDC/FlinkCDCSQLServerSource/pom.xml index 641ef0b3..ca8f1123 100644 --- a/java/FlinkCDC/FlinkCDCSQLServerSource/pom.xml +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/pom.xml @@ -18,7 +18,9 @@ ${target.java.version} 1.20.0 3.4.0 - 5.0.0-1.20 + 3.3.0-1.20 + 9.3.0 + 42.7.2 1.2.0 2.23.1 @@ -86,10 +88,24 @@ org.apache.flink - flink-connector-dynamodb - ${aws.connectors.version} + flink-connector-jdbc + ${flink.jdbc.connector.version} + + + + com.mysql + mysql-connector-j + ${mysql.jdbc.driver.version} + + + org.postgresql + postgresql + ${postgresql.jdbc.driver.version} + + + @@ -163,7 +179,7 @@ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> - com.amazonaws.services.msf.FlinkCDCSqlServerSourceJob + com.amazonaws.services.msf.FlinkCDCSqlServer2JdbcJob diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/java/com/amazonaws/services/msf/FlinkCDCSqlServerSourceJob.java b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/java/com/amazonaws/services/msf/FlinkCDCSqlServer2JdbcJob.java similarity index 58% rename from java/FlinkCDC/FlinkCDCSQLServerSource/src/main/java/com/amazonaws/services/msf/FlinkCDCSqlServerSourceJob.java rename to java/FlinkCDC/FlinkCDCSQLServerSource/src/main/java/com/amazonaws/services/msf/FlinkCDCSqlServer2JdbcJob.java index 82a61696..6a9f4452 100644 --- a/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/java/com/amazonaws/services/msf/FlinkCDCSqlServerSourceJob.java +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/java/com/amazonaws/services/msf/FlinkCDCSqlServer2JdbcJob.java @@ -14,16 +14,13 @@ import java.util.Map; import java.util.Properties; -public class FlinkCDCSqlServerSourceJob { - private static final Logger LOG = LoggerFactory.getLogger(FlinkCDCSqlServerSourceJob.class); +public class FlinkCDCSqlServer2JdbcJob { + private static final Logger LOG = LoggerFactory.getLogger(FlinkCDCSqlServer2JdbcJob.class); // Name of the local JSON resource with the application properties in the same format as they are received from the Amazon Managed Service for Apache Flink runtime private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; private static final int DEFAULT_CDC_DB_PORT = 1433; - private static final String DEFAULT_CDC_DATABASE_NAME = "SampleDataPlatform"; - private static final String DEFAULT_CDC_TABLE_NAME = "dbo.Customers"; - private static final String DEFAULT_DDB_TABLE_NAME = "Customers"; private static boolean isLocal(StreamExecutionEnvironment env) { return env instanceof LocalStreamEnvironment; @@ -36,7 +33,7 @@ private static Map loadApplicationProperties(StreamExecution if (isLocal(env)) { LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( - FlinkCDCSqlServerSourceJob.class.getClassLoader() + FlinkCDCSqlServer2JdbcJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); @@ -69,8 +66,8 @@ public static void main(String[] args) throws Exception { " LastName STRING," + " mail STRING," + // Some additional metadata columns for demonstration purposes - " `_ingestion_ts` AS PROCTIME()," + // The time when Flink is processing this record - " `_operation_ts` TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL," + // The time when the operation was executed on the db + " `_change_processed_at` AS PROCTIME()," + // The time when Flink is processing this record + " `_source_updated_at` TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL," + // The time when the operation was executed on the db " `_table_name` STRING METADATA FROM 'table_name' VIRTUAL," + // Name of the table in the source db " `_schema_name` STRING METADATA FROM 'schema_name' VIRTUAL, " + // Name of the schema in the source db " `_db_name` STRING METADATA FROM 'database_name' VIRTUAL," + // name of the database @@ -82,54 +79,74 @@ public static void main(String[] args) throws Exception { " 'username' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("username"), "missing CDC source username") + "'," + // For simplicity, we are passing the db password as a runtime configuration unencrypted. This should be avoided in production " 'password' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("password"), "missing CDC source password") + "'," + - " 'database-name' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("database.name", DEFAULT_CDC_DATABASE_NAME), "missing CDC source database name") + "'," + - " 'table-name' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("table.name", DEFAULT_CDC_TABLE_NAME), "missing CDC source table name") + "'" + + " 'database-name' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("database.name"), "missing CDC source database name") + "'," + + " 'table-name' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("table.name"), "missing CDC source table name") + "'" + ")"); - // Create a DynamoDB sink table - Properties dynamoDBProperties = applicationProperties.get("DynamoDBSink"); - tableEnv.executeSql("CREATE TABLE DDBSinkTable (" + - " CustomerID INT," + - " FirstName STRING," + - " MiddleInitial STRING," + - " LastName STRING," + - " mail STRING," + - " `_ingestion_ts` TIMESTAMP_LTZ(3)," + - " `_operation_ts` TIMESTAMP_LTZ(3)," + - " `_table_name` STRING," + - " `_schema_name` STRING," + - " `_db_name` STRING" + - // " PRIMARY KEY(CustomerID) NOT ENFORCED" + - ") PARTITIONED BY (`CustomerID`) " + - "WITH (" + - " 'connector' = 'dynamodb'," + - " 'table-name' = '" + Preconditions.checkNotNull(dynamoDBProperties.getProperty("table.name", DEFAULT_DDB_TABLE_NAME), "missing DynamoDB table name") + "'," + - " 'aws.region' = '" + Preconditions.checkNotNull(dynamoDBProperties.getProperty("aws.region"), "missing AWS region") + "'" + - ")"); - - // While developing locally, you can comment the sink table to DynamoDB and uncomment the following table to print records to the console - // When the job is running on Managed Flink any output to console is not visible - tableEnv.executeSql("CREATE TABLE PrintSinkTable (" + - " CustomerID INT," + - " FirstName STRING," + - " MiddleInitial STRING," + - " LastName STRING," + - " mail STRING," + - " `_ingestion_ts` TIMESTAMP_LTZ(3)," + - " `_operation_ts` TIMESTAMP_LTZ(3)," + - " `_table_name` STRING," + - " `_schema_name` STRING," + - " `_db_name` STRING," + - " PRIMARY KEY(CustomerID) NOT ENFORCED" + + // Create a JDBC sink table + // Note that the definition of the table is agnostic to the actual destination database (e.g. MySQL or PostgreSQL) + Properties jdbcSinkProperties = applicationProperties.get("JdbcSink"); + tableEnv.executeSql("CREATE TABLE DestinationTable (" + + " customer_id INT," + + " first_name STRING," + + " middle_initial STRING," + + " last_name STRING," + + " email STRING," + + " _source_updated_at TIMESTAMP(3)," + + " _change_processed_at TIMESTAMP(3)," + + " PRIMARY KEY(customer_id) NOT ENFORCED" + ") WITH (" + - " 'connector' = 'print'" + + " 'connector' = 'jdbc'," + + " 'url' = '" + Preconditions.checkNotNull(jdbcSinkProperties.getProperty("url"), "missing destination database JDBC URL") + "'," + + " 'table-name' = '" + Preconditions.checkNotNull(jdbcSinkProperties.getProperty("table.name"), "missing destination database table name") + "'," + + " 'username' = '" + Preconditions.checkNotNull(jdbcSinkProperties.getProperty("username"), "missing destination database username") + "'," + + " 'password' = '" + Preconditions.checkNotNull(jdbcSinkProperties.getProperty("password"), "missing destination database password") + "'" + ")"); + // While developing locally it may be useful to also print the output to console. + // When the job is running on Managed Flink any output to console is not visible and it would cause overhead + if( isLocal(env)) { + tableEnv.executeSql("CREATE TABLE PrintSinkTable (" + + " CustomerID INT," + + " FirstName STRING," + + " MiddleInitial STRING," + + " LastName STRING," + + " mail STRING," + + " `_change_processed_at` TIMESTAMP_LTZ(3)," + + " `_source_updated_at` TIMESTAMP_LTZ(3)," + + " `_table_name` STRING," + + " `_schema_name` STRING," + + " `_db_name` STRING," + + " PRIMARY KEY(CustomerID) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'print'" + + ")"); + } + StreamStatementSet statementSet = tableEnv.createStatementSet(); - statementSet.addInsertSql("INSERT INTO DDBSinkTable SELECT * FROM Customers"); - statementSet.addInsertSql("INSERT INTO PrintSinkTable SELECT * FROM Customers"); + statementSet.addInsertSql("INSERT INTO DestinationTable (" + + "customer_id, " + + "first_name, " + + "middle_initial, " + + "last_name, " + + "email, " + + "_source_updated_at, " + + "_change_processed_at" + + ") SELECT " + + "CustomerID, " + + "FirstName, " + + "MiddleInitial, " + + "LastName, " + + "mail, " + + "`_source_updated_at`, " + + "`_change_processed_at` " + + "FROM Customers"); + if( isLocal(env)) { + statementSet.addInsertSql("INSERT INTO PrintSinkTable SELECT * FROM Customers"); + } + // Execute the two INSERT INTO statements statementSet.execute(); diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/flink-application-properties-dev.json b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/flink-application-properties-dev.json index d7cdbdf0..8974b2f8 100644 --- a/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/flink-application-properties-dev.json +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/flink-application-properties-dev.json @@ -11,10 +11,12 @@ } }, { - "PropertyGroupId": "DynamoDBSink", + "PropertyGroupId": "JdbcSink", "PropertyMap": { - "table.name": "Customers", - "aws.region": "us-east-1" + "table.name": "customers", + "url": "jdbc:postgresql://localhost:5432/targetdb", + "username": "flinkusr", + "password": "PassW0rd!" } } ] \ No newline at end of file diff --git a/java/FlinkCDC/README.md b/java/FlinkCDC/README.md index 6f89e60f..c7145692 100644 --- a/java/FlinkCDC/README.md +++ b/java/FlinkCDC/README.md @@ -2,4 +2,4 @@ This folder contains examples showing using Flink CDC Sources as source connectors in Amazon Managed Service for Apache Flink -* [Flink CDC SQL Server source (SQL)](./FlinkCDCSQLServerSource) \ No newline at end of file +* [Flink CDC SQL Server source (SQL)](./FlinkCDCSQLServerSource), writing to JDBC sink. \ No newline at end of file From 4a5cb6f00fcbe87a92c2f34cab8c470f2908cbe7 Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Thu, 26 Jun 2025 15:55:49 +0100 Subject: [PATCH 5/6] Complete the README --- .../FlinkCDCSQLServerSource/README.md | 55 +++++++++++++++---- 1 file changed, 44 insertions(+), 11 deletions(-) diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/README.md b/java/FlinkCDC/FlinkCDCSQLServerSource/README.md index 6bf47cd3..dc7da312 100644 --- a/java/FlinkCDC/FlinkCDCSQLServerSource/README.md +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/README.md @@ -7,8 +7,10 @@ Flink API: SQL Language: Java (11) Flink connectors: Flink CDC SQL Server source (3.4), JDBC sink -The job is implemented in SQL embedded in Java. It uses the [Flink CDC SQL Server source connector](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/sqlserver-cdc/) -to capture changes from a database, and propagates the changes to a different database using [JDBC Sink connector](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/table/jdbc/). +The job is implemented in SQL embedded in Java. +It uses the [Flink CDC SQL Server source connector](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/sqlserver-cdc/) +to capture changes from a database. +Changes are propagated to the destination database using [JDBC Sink connector](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/table/jdbc/). ### Source database @@ -21,13 +23,13 @@ See [Flink CDC Sources documentation](https://nightlies.apache.org/flink/flink-c ### Destination database -Note that the JDBC sink is agnostic to the actual destination database technology. This example is tested with both MySQL -and PostgreSQL but can be easily adjusted to different databases. +Note that the JDBC sink is agnostic to the actual destination database technology. +This example is tested with both MySQL and PostgreSQL but can be easily adjusted to different databases. The `JdbcSink` `url` decides the destination database (see [Runtime configuration](#runtime-configuration), below). The correct JDBC driver must be included in the `pom.xml`. This example includes both MySQL and PostgreSQL drivers. -### Testing locally using Docker Compose +### Testing with local databases using Docker Compose This example can be run locally using Docker. @@ -67,11 +69,17 @@ When running on Amazon Managed Service for Apache Flink and with databases on AW CONSTRAINT [CustomerPK] PRIMARY KEY CLUSTERED ([CustomerID] ASC) ) ON [PRIMARY]; ``` - 6. CDC must be enabled both on the source database AND on the table + 6. CDC must be enabled both on the source database. on Amazon RDS SQL Server use the following stored procedure: + ```sql + exec msdb.dbo.rds_cdc_enable_db 'MyDB' + ``` + On self-managed SQL server you need to call a different procedure, while in the database: ```sql USE MyDB; EXEC sys.sp_cdc_enable_db; - + ``` + 7. CDC must also be enabled on the table: + ```sql EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'Customers', @@ -97,19 +105,44 @@ When running on Amazon Managed Service for Apache Flink and with databases on AW ### Running in IntelliJ You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. +Run the databases locally using Docker Compose, as described [above](#testing-with-local-databases-using-docker-compose). -See [Running examples locally](../../running-examples-locally.md) for details. +See [Running examples locally](../../running-examples-locally.md) for details about running the application in the IDE. ### Running on Amazon Managed Service for Apache Flink -TBD -(set up VPC connectivity) +To run the application in Amazon Managed Service for Apache Flink make sure the application configuration has the following: +* VPC networking +* The selected Subnets can route traffic to both the source and destination databases +* The Security Group allow traffic from the application to both source and destination databases + ### Runtime configuration -TBD +When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*. + +When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](resources/flink-application-properties-dev.json) file located in the resources folder. + +Runtime parameters: + +| Group ID | Key | Description | +|-------------|-----------------|----------------------------------------------------------------------------------------------------------------------------| +| `CDCSource` | `hostname` | Source database DNS hostname or IP | +| `CDCSource` | `port` | Source database port (normally `1433`) | +| `CDCSource` | `username` | Source database username. The user must be `dbo` of the database | +| `CDCSource` | `password` | Source database password | +| `CDCSource` | `database.name` | Source database name | +| `CDCSource` | `table.name` | Source table name. e.g. `dbo.Customers` | +| `JdbcSink` | `url` | Destination database JDBC URL. e.g. `jdbc:postgresql://localhost:5432/targetdb`. Note: the URL includes the database name. | +| `JdbcSink` | `table.name` | Destination table. e.g. `customers` | +| `JdbcSink` | `username` | Destination database user | +| `JdbcSink` | `password` | Destination database password | + +### Known limitations +Using the SQL interface of Flink CDC Sources greatly simplify the implementation of a passthrough application. +However, schema changes in the source table are ignored. ## References From c737444e69088dbfe4512575f142e47ce41e4591 Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Fri, 27 Jun 2025 08:49:59 +0100 Subject: [PATCH 6/6] Fix readme and comments --- .../FlinkCDCSQLServerSource/README.md | 30 +++++++++---------- .../msf/FlinkCDCSqlServer2JdbcJob.java | 9 ++++-- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/README.md b/java/FlinkCDC/FlinkCDCSQLServerSource/README.md index dc7da312..b1658320 100644 --- a/java/FlinkCDC/FlinkCDCSQLServerSource/README.md +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/README.md @@ -2,10 +2,10 @@ This example shows how to capture data from a database (SQL Server in this case) directly from Flink using a Flink CDC source connector. -Flink version: 1.20 -Flink API: SQL -Language: Java (11) -Flink connectors: Flink CDC SQL Server source (3.4), JDBC sink +* Flink version: 1.20 +* Flink API: SQL +* Language: Java (11) +* Flink connectors: Flink CDC SQL Server source (3.4), JDBC sink The job is implemented in SQL embedded in Java. It uses the [Flink CDC SQL Server source connector](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/sqlserver-cdc/) @@ -14,7 +14,7 @@ Changes are propagated to the destination database using [JDBC Sink connector](h ### Source database -This example uses Ms SQL Server as source database. To use a different database ad CDC source you need to use a different Flink CDC Source connector. +This example uses Ms SQL Server as source database. To use a different database as CDC source you need to switch to a different Flink CDC Source connector. Different Flink CDC Sources require different configurations and support different metadata fields. To switch the source to a different database you need to modify the code. @@ -26,7 +26,7 @@ See [Flink CDC Sources documentation](https://nightlies.apache.org/flink/flink-c Note that the JDBC sink is agnostic to the actual destination database technology. This example is tested with both MySQL and PostgreSQL but can be easily adjusted to different databases. -The `JdbcSink` `url` decides the destination database (see [Runtime configuration](#runtime-configuration), below). +The `url` property in the `JdbcSink` configuration group decides the destination database (see [Runtime configuration](#runtime-configuration), below). The correct JDBC driver must be included in the `pom.xml`. This example includes both MySQL and PostgreSQL drivers. ### Testing with local databases using Docker Compose @@ -34,7 +34,7 @@ The correct JDBC driver must be included in the `pom.xml`. This example includes This example can be run locally using Docker. A [Docker Compose file](./docker/docker-compose.yml) is provided to run local SQL Server, MySQL and PostgreSQL databases. -The local databases are initialized creating users, databases and tables. Some initial data are also inserted into the source table. +The local databases are initialized by creating users, databases and tables. Some initial records are also inserted into the source table. You can run the Flink application inside your IDE following the instructions in [Running in IntelliJ](#running-in-intellij). The default local configuration connects to the local PostgreSQL db defined in Docker Compose. @@ -48,16 +48,16 @@ Use `docker compose down -v` to shut them down, also removing the data volumes. When running on Amazon Managed Service for Apache Flink and with databases on AWS, you need to set up the databases manually, ensuring you set up all the following: -> You can find the SQL scripts which set up the dockerized databases checking out the init scripts for +> YYou can find the SQL scripts that set up the dockerized databases by checking out the init scripts for > [SQL Server](docker/sqlserver-init/init.sql), [MySQL](docker/mysql-init/init.sql), > and [PostgreSQL](docker/postgres-init/init.sql). 1. **Source database (Ms SQL Server)** 1. SQL Server Agent must be running 2. Native (user/password) authentication must be enabled - 3. The login used by FlinCDC (e.g. `flink_cdc`) must be `db_owner` for the database - 4. The source database and table must match the `database.name`and `table.name` you specify in the source configuration (e.g. `SampleDataPlatform` and `Customers`) - 5. The source table with must have this schema: + 3. The login used by Flink CDC (e.g. `flink_cdc`) must be `db_owner` for the database + 4. The source database and table must match the `database.name` and `table.name` you specify in the source configuration (e.g. `SampleDataPlatform` and `Customers`) + 5. The source table must have this schema: ```sql CREATE TABLE [dbo].[Customers] ( @@ -69,7 +69,7 @@ When running on Amazon Managed Service for Apache Flink and with databases on AW CONSTRAINT [CustomerPK] PRIMARY KEY CLUSTERED ([CustomerID] ASC) ) ON [PRIMARY]; ``` - 6. CDC must be enabled both on the source database. on Amazon RDS SQL Server use the following stored procedure: + 6. CDC must be enabled both on the source database. On Amazon RDS SQL Server use the following stored procedure: ```sql exec msdb.dbo.rds_cdc_enable_db 'MyDB' ``` @@ -115,7 +115,7 @@ See [Running examples locally](../../running-examples-locally.md) for details ab To run the application in Amazon Managed Service for Apache Flink make sure the application configuration has the following: * VPC networking * The selected Subnets can route traffic to both the source and destination databases -* The Security Group allow traffic from the application to both source and destination databases +* The Security Group allows traffic from the application to both source and destination databases ### Runtime configuration @@ -130,7 +130,7 @@ Runtime parameters: |-------------|-----------------|----------------------------------------------------------------------------------------------------------------------------| | `CDCSource` | `hostname` | Source database DNS hostname or IP | | `CDCSource` | `port` | Source database port (normally `1433`) | -| `CDCSource` | `username` | Source database username. The user must be `dbo` of the database | +| `CDCSource` | `username` | Source database username. The user must be `dbo_owner` of the database | | `CDCSource` | `password` | Source database password | | `CDCSource` | `database.name` | Source database name | | `CDCSource` | `table.name` | Source table name. e.g. `dbo.Customers` | @@ -141,7 +141,7 @@ Runtime parameters: ### Known limitations -Using the SQL interface of Flink CDC Sources greatly simplify the implementation of a passthrough application. +Using the SQL interface of Flink CDC Sources greatly simplifies the implementation of a passthrough application. However, schema changes in the source table are ignored. ## References diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/java/com/amazonaws/services/msf/FlinkCDCSqlServer2JdbcJob.java b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/java/com/amazonaws/services/msf/FlinkCDCSqlServer2JdbcJob.java index 6a9f4452..1fbc14fd 100644 --- a/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/java/com/amazonaws/services/msf/FlinkCDCSqlServer2JdbcJob.java +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/java/com/amazonaws/services/msf/FlinkCDCSqlServer2JdbcJob.java @@ -104,8 +104,9 @@ public static void main(String[] args) throws Exception { " 'password' = '" + Preconditions.checkNotNull(jdbcSinkProperties.getProperty("password"), "missing destination database password") + "'" + ")"); - // While developing locally it may be useful to also print the output to console. - // When the job is running on Managed Flink any output to console is not visible and it would cause overhead + // When running locally we add a secondary sink to print the output to the console. + // When the job is running on Managed Flink any output to console is not visible and may cause overhead. + // It is recommended not to print any output to the console when running the application on Managed Flink. if( isLocal(env)) { tableEnv.executeSql("CREATE TABLE PrintSinkTable (" + " CustomerID INT," + @@ -124,7 +125,9 @@ public static void main(String[] args) throws Exception { ")"); } - + // Note that we use a statement set to add the two "INSERT INTO..." statements. + // When tableEnv.executeSQL(...) is used with INSERT INTO on a job running in Application mode, like on Managed Flink, + // the first statement triggers the job execution, and any code which follows is ignored. StreamStatementSet statementSet = tableEnv.createStatementSet(); statementSet.addInsertSql("INSERT INTO DestinationTable (" + "customer_id, " +