diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/README.md b/java/FlinkCDC/FlinkCDCSQLServerSource/README.md
new file mode 100644
index 0000000..b165832
--- /dev/null
+++ b/java/FlinkCDC/FlinkCDCSQLServerSource/README.md
@@ -0,0 +1,150 @@
+# FlinkCDC SQL Server source example
+
+This example shows how to capture data from a database (SQL Server in this case) directly from Flink using a Flink CDC source connector.
+
+* Flink version: 1.20
+* Flink API: SQL
+* Language: Java (11)
+* Flink connectors: Flink CDC SQL Server source (3.4), JDBC sink
+
+The job is implemented in SQL embedded in Java.
+It uses the [Flink CDC SQL Server source connector](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/sqlserver-cdc/)
+to capture changes from a database.
+Changes are propagated to the destination database using [JDBC Sink connector](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/table/jdbc/).
+
+### Source database
+
+This example uses Ms SQL Server as source database. To use a different database as CDC source you need to switch to a different Flink CDC Source connector.
+
+Different Flink CDC Sources require different configurations and support different metadata fields. To switch the source to a different database you need to modify the code.
+
+See [Flink CDC Sources documentation](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/sqlserver-cdc) for further details.
+
+
+### Destination database
+
+Note that the JDBC sink is agnostic to the actual destination database technology.
+This example is tested with both MySQL and PostgreSQL but can be easily adjusted to different databases.
+
+The `url` property in the `JdbcSink` configuration group decides the destination database (see [Runtime configuration](#runtime-configuration), below).
+The correct JDBC driver must be included in the `pom.xml`. This example includes both MySQL and PostgreSQL drivers.
+
+### Testing with local databases using Docker Compose
+
+This example can be run locally using Docker.
+
+A [Docker Compose file](./docker/docker-compose.yml) is provided to run local SQL Server, MySQL and PostgreSQL databases.
+The local databases are initialized by creating users, databases and tables. Some initial records are also inserted into the source table.
+
+You can run the Flink application inside your IDE following the instructions in [Running in IntelliJ](#running-in-intellij).
+The default local configuration connects to the local PostgreSQL db defined in Docker Compose.
+
+To start the local databases run `docker compose up -d` in the `./docker` folder.
+
+Use `docker compose down -v` to shut them down, also removing the data volumes.
+
+
+### Database prerequisites
+
+When running on Amazon Managed Service for Apache Flink and with databases on AWS, you need to set up the databases manually, ensuring you set up all the following:
+
+> YYou can find the SQL scripts that set up the dockerized databases by checking out the init scripts for
+> [SQL Server](docker/sqlserver-init/init.sql), [MySQL](docker/mysql-init/init.sql),
+> and [PostgreSQL](docker/postgres-init/init.sql).
+
+1. **Source database (Ms SQL Server)**
+ 1. SQL Server Agent must be running
+ 2. Native (user/password) authentication must be enabled
+ 3. The login used by Flink CDC (e.g. `flink_cdc`) must be `db_owner` for the database
+ 4. The source database and table must match the `database.name` and `table.name` you specify in the source configuration (e.g. `SampleDataPlatform` and `Customers`)
+ 5. The source table must have this schema:
+ ```sql
+ CREATE TABLE [dbo].[Customers]
+ (
+ [CustomerID] [int] IDENTITY (1,1) NOT NULL,
+ [FirstName] [nvarchar](40) NOT NULL,
+ [MiddleInitial] [nvarchar](40) NULL,
+ [LastName] [nvarchar](40) NOT NULL,
+ [mail] [varchar](50) NULL,
+ CONSTRAINT [CustomerPK] PRIMARY KEY CLUSTERED ([CustomerID] ASC)
+ ) ON [PRIMARY];
+ ```
+ 6. CDC must be enabled both on the source database. On Amazon RDS SQL Server use the following stored procedure:
+ ```sql
+ exec msdb.dbo.rds_cdc_enable_db 'MyDB'
+ ```
+ On self-managed SQL server you need to call a different procedure, while in the database:
+ ```sql
+ USE MyDB;
+ EXEC sys.sp_cdc_enable_db;
+ ```
+ 7. CDC must also be enabled on the table:
+ ```sql
+ EXEC sys.sp_cdc_enable_table
+ @source_schema = N'dbo',
+ @source_name = N'Customers',
+ @role_name = NULL,
+ @supports_net_changes = 0;
+ ```
+2. **Destination database (MySQL or PostgreSQL)**
+ 1. The destination database name must match the `url` configured in the JDBC sink
+ 2. The destination table must have the following schema
+ ```sql
+ CREATE TABLE customers (
+ customer_id INT PRIMARY KEY,
+ first_name VARCHAR(40),
+ middle_initial VARCHAR(40),
+ last_name VARCHAR(40),
+ email VARCHAR(50),
+ _source_updated_at TIMESTAMP,
+ _change_processed_at TIMESTAMP
+ );
+ ```
+ 3. The destination database user must have SELECT, INSERT, UPDATE and DELETE permissions on the destination table
+
+### Running in IntelliJ
+
+You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.
+Run the databases locally using Docker Compose, as described [above](#testing-with-local-databases-using-docker-compose).
+
+See [Running examples locally](../../running-examples-locally.md) for details about running the application in the IDE.
+
+
+### Running on Amazon Managed Service for Apache Flink
+
+To run the application in Amazon Managed Service for Apache Flink make sure the application configuration has the following:
+* VPC networking
+* The selected Subnets can route traffic to both the source and destination databases
+* The Security Group allows traffic from the application to both source and destination databases
+
+
+### Runtime configuration
+
+When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*.
+
+When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](resources/flink-application-properties-dev.json) file located in the resources folder.
+
+Runtime parameters:
+
+| Group ID | Key | Description |
+|-------------|-----------------|----------------------------------------------------------------------------------------------------------------------------|
+| `CDCSource` | `hostname` | Source database DNS hostname or IP |
+| `CDCSource` | `port` | Source database port (normally `1433`) |
+| `CDCSource` | `username` | Source database username. The user must be `dbo_owner` of the database |
+| `CDCSource` | `password` | Source database password |
+| `CDCSource` | `database.name` | Source database name |
+| `CDCSource` | `table.name` | Source table name. e.g. `dbo.Customers` |
+| `JdbcSink` | `url` | Destination database JDBC URL. e.g. `jdbc:postgresql://localhost:5432/targetdb`. Note: the URL includes the database name. |
+| `JdbcSink` | `table.name` | Destination table. e.g. `customers` |
+| `JdbcSink` | `username` | Destination database user |
+| `JdbcSink` | `password` | Destination database password |
+
+### Known limitations
+
+Using the SQL interface of Flink CDC Sources greatly simplifies the implementation of a passthrough application.
+However, schema changes in the source table are ignored.
+
+## References
+
+* [Flink CDC SQL Server documentation](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/sqlserver-cdc)
+* [Debezium SQL Server documentation](https://debezium.io/documentation/reference/1.9/connectors/sqlserver.html)
\ No newline at end of file
diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/docker-compose.yml b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/docker-compose.yml
new file mode 100644
index 0000000..ca6abb4
--- /dev/null
+++ b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/docker-compose.yml
@@ -0,0 +1,77 @@
+services:
+
+ # Ms SQL Server + init
+ sqlserver:
+ image: mcr.microsoft.com/mssql/server:2022-latest
+ container_name: mssql-server-2022
+ environment:
+ - ACCEPT_EULA=Y
+ - SA_PASSWORD=YourStrong@Passw0rd
+ - MSSQL_PID=Developer
+ - MSSQL_AGENT_ENABLED=true
+ ports:
+ - "1433:1433"
+ volumes:
+ - sqlserver_data:/var/opt/mssql
+ - ./sqlserver-init/init.sql:/tmp/init.sql
+ restart: unless-stopped
+ healthcheck:
+ test: ["CMD-SHELL", "/opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P YourStrong@Passw0rd -Q 'SELECT 1' -C"]
+ interval: 10s
+ timeout: 5s
+ retries: 5
+ start_period: 10s
+
+ sqlserver-init:
+ image: mcr.microsoft.com/mssql/server:2022-latest
+ depends_on:
+ sqlserver:
+ condition: service_healthy
+ volumes:
+ - ./sqlserver-init/init.sql:/tmp/init.sql
+ command: >
+ bash -c "
+ echo 'Waiting for SQL Server to be ready...' &&
+ sleep 5 &&
+ echo 'Running initialization script...' &&
+ /opt/mssql-tools18/bin/sqlcmd -S sqlserver -U sa -P YourStrong@Passw0rd -i /tmp/init.sql -C &&
+ echo 'Initialization completed!'
+ "
+
+ # MySQL database
+ mysql:
+ image: mysql:8.0
+ container_name: mysql_db
+ restart: always
+ environment:
+ MYSQL_ROOT_PASSWORD: R00tpwd!
+ MYSQL_DATABASE: targetdb
+ ports:
+ - "3306:3306"
+ volumes:
+ - mysql_data:/var/lib/mysql
+ - ./mysql-init:/docker-entrypoint-initdb.d
+ command: --default-authentication-plugin=mysql_native_password
+
+ # PostgreSQL database
+ postgres:
+ image: postgres:15
+ container_name: postgres_db
+ restart: always
+ environment:
+ POSTGRES_DB: targetdb
+ POSTGRES_USER: flinkusr
+ POSTGRES_PASSWORD: PassW0rd!
+ ports:
+ - "5432:5432"
+ volumes:
+ - postgres_data:/var/lib/postgresql/data
+ - ./postgres-init:/docker-entrypoint-initdb.d
+
+volumes:
+ sqlserver_data:
+ driver: local
+ mysql_data:
+ driver: local
+ postgres_data:
+ driver: local
diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/mysql-init/init.sql b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/mysql-init/init.sql
new file mode 100644
index 0000000..211e432
--- /dev/null
+++ b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/mysql-init/init.sql
@@ -0,0 +1,15 @@
+CREATE USER 'flinkusr'@'%' IDENTIFIED BY 'PassW0rd!';
+GRANT SELECT, INSERT, UPDATE, DELETE, SHOW DATABASES ON *.* TO 'flinkusr'@'%';
+
+FLUSH PRIVILEGES;
+
+-- Create customer table
+CREATE TABLE customers (
+ customer_id INT PRIMARY KEY,
+ first_name VARCHAR(40),
+ middle_initial VARCHAR(40),
+ last_name VARCHAR(40),
+ email VARCHAR(50),
+ _source_updated_at TIMESTAMP,
+ _change_processed_at TIMESTAMP
+);
diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/postgres-init/init.sql b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/postgres-init/init.sql
new file mode 100644
index 0000000..05e9051
--- /dev/null
+++ b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/postgres-init/init.sql
@@ -0,0 +1,10 @@
+-- Create customer table
+CREATE TABLE customers (
+ customer_id INT PRIMARY KEY,
+ first_name VARCHAR(40),
+ middle_initial VARCHAR(40),
+ last_name VARCHAR(40),
+ email VARCHAR(50),
+ _source_updated_at TIMESTAMP,
+ _change_processed_at TIMESTAMP
+);
diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/docker/sqlserver-init/init.sql b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/sqlserver-init/init.sql
new file mode 100644
index 0000000..4fb43a9
--- /dev/null
+++ b/java/FlinkCDC/FlinkCDCSQLServerSource/docker/sqlserver-init/init.sql
@@ -0,0 +1,56 @@
+-- Create SampleDataPlatform database
+CREATE DATABASE SampleDataPlatform;
+GO
+
+-- Use the SampleDataPlatform database
+USE SampleDataPlatform;
+GO
+
+-- Create login for flink_cdc
+CREATE LOGIN flink_cdc WITH PASSWORD = 'FlinkCDC@123';
+GO
+
+-- Create user in SampleDataPlatform database
+CREATE USER flink_cdc FOR LOGIN flink_cdc;
+GO
+
+-- Grant necessary permissions for CDC operations
+ALTER ROLE db_owner ADD MEMBER flink_cdc;
+GO
+
+-- Enable CDC on the SampleDataPlatform database
+USE SampleDataPlatform;
+EXEC sys.sp_cdc_enable_db;
+GO
+
+-- Create Customers table with the specified schema
+CREATE TABLE [dbo].[Customers]
+(
+ [CustomerID] [int] IDENTITY (1,1) NOT NULL,
+ [FirstName] [nvarchar](40) NOT NULL,
+ [MiddleInitial] [nvarchar](40) NULL,
+ [LastName] [nvarchar](40) NOT NULL,
+ [mail] [varchar](50) NULL,
+ CONSTRAINT [CustomerPK] PRIMARY KEY CLUSTERED ([CustomerID] ASC)
+ WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY]
+) ON [PRIMARY];
+GO
+
+-- Enable CDC on the Customers table
+EXEC sys.sp_cdc_enable_table
+ @source_schema = N'dbo',
+ @source_name = N'Customers',
+ @role_name = NULL,
+ @supports_net_changes = 0;
+GO
+
+-- Insert some sample data
+INSERT INTO [dbo].[Customers] ([FirstName], [MiddleInitial], [LastName], [mail])
+VALUES ('John', 'A', 'Doe', 'john.doe@example.com'),
+ ('Jane', NULL, 'Smith', 'jane.smith@example.com'),
+ ('Bob', 'R', 'Johnson', 'bob.johnson@example.com'),
+ ('Alice', 'M', 'Williams', 'alice.williams@example.com'),
+ ('Charlie', NULL, 'Brown', 'charlie.brown@example.com');
+GO
+
+PRINT 'Database initialization completed successfully!';
diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/pom.xml b/java/FlinkCDC/FlinkCDCSQLServerSource/pom.xml
new file mode 100644
index 0000000..ca8f112
--- /dev/null
+++ b/java/FlinkCDC/FlinkCDCSQLServerSource/pom.xml
@@ -0,0 +1,191 @@
+
+
+ 4.0.0
+
+ com.amazonaws
+ flink-cdc-sqlserver-sql-source
+ 1.0
+ jar
+
+
+ UTF-8
+ ${project.basedir}/target
+ ${project.name}-${project.version}
+ 11
+ ${target.java.version}
+ ${target.java.version}
+ 1.20.0
+ 3.4.0
+ 3.3.0-1.20
+ 9.3.0
+ 42.7.2
+ 1.2.0
+ 2.23.1
+
+
+
+
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-clients
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-runtime-web
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-json
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-runtime
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-planner-loader
+ ${flink.version}
+ provided
+
+
+
+
+ com.amazonaws
+ aws-kinesisanalytics-runtime
+ ${kda.runtime.version}
+ provided
+
+
+
+
+ org.apache.flink
+ flink-connector-base
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-connector-sqlserver-cdc
+ ${flink.cdc.version}
+
+
+ org.apache.flink
+ flink-connector-jdbc
+ ${flink.jdbc.connector.version}
+
+
+
+
+
+ com.mysql
+ mysql-connector-j
+ ${mysql.jdbc.driver.version}
+
+
+ org.postgresql
+ postgresql
+ ${postgresql.jdbc.driver.version}
+
+
+
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ ${log4j.version}
+
+
+ org.apache.logging.log4j
+ log4j-api
+ ${log4j.version}
+
+
+ org.apache.logging.log4j
+ log4j-core
+ ${log4j.version}
+
+
+
+
+ ${buildDirectory}
+ ${jar.finalName}
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.1
+
+ ${target.java.version}
+ ${target.java.version}
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.1
+
+
+
+ package
+
+ shade
+
+
+
+
+ org.apache.flink:force-shading
+ com.google.code.findbugs:jsr305
+ org.slf4j:*
+ log4j:*
+
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+ com.amazonaws.services.msf.FlinkCDCSqlServer2JdbcJob
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/java/com/amazonaws/services/msf/FlinkCDCSqlServer2JdbcJob.java b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/java/com/amazonaws/services/msf/FlinkCDCSqlServer2JdbcJob.java
new file mode 100644
index 0000000..1fbc14f
--- /dev/null
+++ b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/java/com/amazonaws/services/msf/FlinkCDCSqlServer2JdbcJob.java
@@ -0,0 +1,157 @@
+package com.amazonaws.services.msf;
+
+import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
+import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamStatementSet;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+public class FlinkCDCSqlServer2JdbcJob {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkCDCSqlServer2JdbcJob.class);
+
+ // Name of the local JSON resource with the application properties in the same format as they are received from the Amazon Managed Service for Apache Flink runtime
+ private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json";
+
+ private static final int DEFAULT_CDC_DB_PORT = 1433;
+
+ private static boolean isLocal(StreamExecutionEnvironment env) {
+ return env instanceof LocalStreamEnvironment;
+ }
+
+ /**
+ * Load application properties from Amazon Managed Service for Apache Flink runtime or from a local resource, when the environment is local
+ */
+ private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
+ if (isLocal(env)) {
+ LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
+ return KinesisAnalyticsRuntime.getApplicationProperties(
+ FlinkCDCSqlServer2JdbcJob.class.getClassLoader()
+ .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath());
+ } else {
+ LOG.info("Loading application properties from Amazon Managed Service for Apache Flink");
+ return KinesisAnalyticsRuntime.getApplicationProperties();
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ // set up the streaming execution environment
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
+
+ final Map applicationProperties = loadApplicationProperties(env);
+ LOG.warn("Application properties: {}", applicationProperties);
+
+ // Enable checkpoints and set parallelism when running locally
+ // On Managed Flink, checkpoints and application parallelism are managed by the service and controlled by the application configuration
+ if (isLocal(env)) {
+ env.setParallelism(1); // Ms SQL Server Flink CDC is single-threaded
+ env.enableCheckpointing(30000);
+ }
+
+
+ // Create CDC source table
+ Properties cdcSourceProperties = applicationProperties.get("CDCSource");
+ tableEnv.executeSql("CREATE TABLE Customers (" +
+ " CustomerID INT," +
+ " FirstName STRING," +
+ " MiddleInitial STRING," +
+ " LastName STRING," +
+ " mail STRING," +
+ // Some additional metadata columns for demonstration purposes
+ " `_change_processed_at` AS PROCTIME()," + // The time when Flink is processing this record
+ " `_source_updated_at` TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL," + // The time when the operation was executed on the db
+ " `_table_name` STRING METADATA FROM 'table_name' VIRTUAL," + // Name of the table in the source db
+ " `_schema_name` STRING METADATA FROM 'schema_name' VIRTUAL, " + // Name of the schema in the source db
+ " `_db_name` STRING METADATA FROM 'database_name' VIRTUAL," + // name of the database
+ " PRIMARY KEY(CustomerID) NOT ENFORCED" +
+ ") WITH (" +
+ " 'connector' = 'sqlserver-cdc'," +
+ " 'hostname' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("hostname"), "missing CDC source hostname") + "'," +
+ " 'port' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("port", Integer.toString(DEFAULT_CDC_DB_PORT)), "missing CDC source port") + "'," +
+ " 'username' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("username"), "missing CDC source username") + "'," +
+ // For simplicity, we are passing the db password as a runtime configuration unencrypted. This should be avoided in production
+ " 'password' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("password"), "missing CDC source password") + "'," +
+ " 'database-name' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("database.name"), "missing CDC source database name") + "'," +
+ " 'table-name' = '" + Preconditions.checkNotNull(cdcSourceProperties.getProperty("table.name"), "missing CDC source table name") + "'" +
+ ")");
+
+
+ // Create a JDBC sink table
+ // Note that the definition of the table is agnostic to the actual destination database (e.g. MySQL or PostgreSQL)
+ Properties jdbcSinkProperties = applicationProperties.get("JdbcSink");
+ tableEnv.executeSql("CREATE TABLE DestinationTable (" +
+ " customer_id INT," +
+ " first_name STRING," +
+ " middle_initial STRING," +
+ " last_name STRING," +
+ " email STRING," +
+ " _source_updated_at TIMESTAMP(3)," +
+ " _change_processed_at TIMESTAMP(3)," +
+ " PRIMARY KEY(customer_id) NOT ENFORCED" +
+ ") WITH (" +
+ " 'connector' = 'jdbc'," +
+ " 'url' = '" + Preconditions.checkNotNull(jdbcSinkProperties.getProperty("url"), "missing destination database JDBC URL") + "'," +
+ " 'table-name' = '" + Preconditions.checkNotNull(jdbcSinkProperties.getProperty("table.name"), "missing destination database table name") + "'," +
+ " 'username' = '" + Preconditions.checkNotNull(jdbcSinkProperties.getProperty("username"), "missing destination database username") + "'," +
+ " 'password' = '" + Preconditions.checkNotNull(jdbcSinkProperties.getProperty("password"), "missing destination database password") + "'" +
+ ")");
+
+ // When running locally we add a secondary sink to print the output to the console.
+ // When the job is running on Managed Flink any output to console is not visible and may cause overhead.
+ // It is recommended not to print any output to the console when running the application on Managed Flink.
+ if( isLocal(env)) {
+ tableEnv.executeSql("CREATE TABLE PrintSinkTable (" +
+ " CustomerID INT," +
+ " FirstName STRING," +
+ " MiddleInitial STRING," +
+ " LastName STRING," +
+ " mail STRING," +
+ " `_change_processed_at` TIMESTAMP_LTZ(3)," +
+ " `_source_updated_at` TIMESTAMP_LTZ(3)," +
+ " `_table_name` STRING," +
+ " `_schema_name` STRING," +
+ " `_db_name` STRING," +
+ " PRIMARY KEY(CustomerID) NOT ENFORCED" +
+ ") WITH (" +
+ " 'connector' = 'print'" +
+ ")");
+ }
+
+ // Note that we use a statement set to add the two "INSERT INTO..." statements.
+ // When tableEnv.executeSQL(...) is used with INSERT INTO on a job running in Application mode, like on Managed Flink,
+ // the first statement triggers the job execution, and any code which follows is ignored.
+ StreamStatementSet statementSet = tableEnv.createStatementSet();
+ statementSet.addInsertSql("INSERT INTO DestinationTable (" +
+ "customer_id, " +
+ "first_name, " +
+ "middle_initial, " +
+ "last_name, " +
+ "email, " +
+ "_source_updated_at, " +
+ "_change_processed_at" +
+ ") SELECT " +
+ "CustomerID, " +
+ "FirstName, " +
+ "MiddleInitial, " +
+ "LastName, " +
+ "mail, " +
+ "`_source_updated_at`, " +
+ "`_change_processed_at` " +
+ "FROM Customers");
+ if( isLocal(env)) {
+ statementSet.addInsertSql("INSERT INTO PrintSinkTable SELECT * FROM Customers");
+ }
+
+
+ // Execute the two INSERT INTO statements
+ statementSet.execute();
+ }
+}
diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/flink-application-properties-dev.json b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/flink-application-properties-dev.json
new file mode 100644
index 0000000..8974b2f
--- /dev/null
+++ b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/flink-application-properties-dev.json
@@ -0,0 +1,22 @@
+[
+ {
+ "PropertyGroupId": "CDCSource",
+ "PropertyMap": {
+ "hostname": "localhost",
+ "port": "1433",
+ "username": "flink_cdc",
+ "password": "FlinkCDC@123",
+ "database.name": "SampleDataPlatform",
+ "table.name": "dbo.Customers"
+ }
+ },
+ {
+ "PropertyGroupId": "JdbcSink",
+ "PropertyMap": {
+ "table.name": "customers",
+ "url": "jdbc:postgresql://localhost:5432/targetdb",
+ "username": "flinkusr",
+ "password": "PassW0rd!"
+ }
+ }
+]
\ No newline at end of file
diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/log4j2.properties b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..3546643
--- /dev/null
+++ b/java/FlinkCDC/FlinkCDCSQLServerSource/src/main/resources/log4j2.properties
@@ -0,0 +1,7 @@
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/java/FlinkCDC/README.md b/java/FlinkCDC/README.md
new file mode 100644
index 0000000..c714569
--- /dev/null
+++ b/java/FlinkCDC/README.md
@@ -0,0 +1,5 @@
+# Using Flink CDC Sources
+
+This folder contains examples showing using Flink CDC Sources as source connectors in Amazon Managed Service for Apache Flink
+
+* [Flink CDC SQL Server source (SQL)](./FlinkCDCSQLServerSource), writing to JDBC sink.
\ No newline at end of file
diff --git a/java/pom.xml b/java/pom.xml
index 54e412e..877b596 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -42,5 +42,6 @@
SQSSink
S3AvroSink
S3AvroSource
+ FlinkCDC/FlinkCDCSQLServerSource
\ No newline at end of file