Generate and Compare Debezium Change Data Capture (CDC) Avro Schema.
Leveraging Debezium and Schema Registry own codebases, each Table of a Database is mapped to 3 components:
- Key Avro schema: describes the
PRIMARY KEY
of the Table -NULL
if not set - Value Avro schema: describes each Row of the Table
- Envelope Avro schema: wrapper for the Value, used by Debezium to realize CDC when Producing to a Topic
Debezium CDC Source Connector uses the Key and the Envelope schemas when producing to a Topic: the former is used for the Message Key, the latter for the Message Payload.
Skemium leverages those schemas to compare between evolutions of the originating Database Schema, and identifies compatibility issues executing the comparison logic implemented by Schema Registry.
If you make changes to your Database Schema, and want to know if it's going to break your Debezium CDC production,
skemium
is the tool for you.
In our experience, the way Debezium works can catch users off guard in 2 major ways:
- Making changes to the source Database Schema in ways that break Schema Compatibility
- Non-zero amount of time between making changes to the source Database Schema, and that change being captured by Debezium and published to Schema Registry
Avoiding the first is made much harder by the second!
There is sometimes confusion between “making a DB Schema” vs “making a Schema Registry Schema” change:
- the former happen when developers apply changes to their RDBMS: usually, before their application code start relying on the new schema
- the latter happens when data is actually updated into one of the changed tables:
- Debezium detects it (reading the RDBMS WAL and the
DESCRIBE TABLE
command) - Debezium's
Producer
attempts to create a new Schema version for the associated schema subject- either fails if the change violates the configured Schema Compatibility
- or succeeds in publishing a new version for the schema subject creation was successful
- Debezium's
Producer
resumes producing to the related Kafka Topic
- Debezium detects it (reading the RDBMS WAL and the
When 2.1. above happens, Debezium stops producing and in turns stops consuming the RDBMS WAL:
- Traffic from the RDBMS to Kafka halts (bad!)
- RDBMS storage fills up, as the WAL is not getting flushed (worse!)
Skemium's primary objective is to target this issue and empower developers to instrument their CI process to detect a breakage of a CDC schema subject as early as possible.
Ideally, when a PR is submitted and before any RDBMS schema has been changed in production, it should be possible to:
- spin up a local instance of the RDBMS
- apply the latest desired DB Schema
- execute
skemium generate
to obtain the corresponding Avro Schemas (i.e. what would eventually land in Schema Registry) - execute
skemium compare
to compare an existing copy of the Avro Schemas, with the new one, applying the desired Schema Compatibility
Sometimes is going to be inevitable: you need to make a change in your Database Schema, and a new version of the schema subject must be released - a version that breaks Schema Compatibility.
This is beyond the scope of Skemium (for now?), but in those situations what you can do is something along the lines of:
- Make a coordinated plan with Consumer Services of the CDC Topic
- Temporarily disable Schema Compatibility in Schema Registry
- Let Debezium publish a new schema subject version
- Restore Schema Compatibility
The details will depend on your specific circumstances, and your mileage may vary ¯\_(ツ)_/¯
.
Skemium does not implement its own schema extraction or serialization logic: it relies instead on the source code of Debezium and Schema Registry. Specifically, the following 2 packages do the bulk of the work:
<!-- https://mvnrepository.com/artifact/io.debezium/debezium-core -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${ver.debezium}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-connect-avro-converter -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>${ver.kafka-connect-avro-converter}</version>
</dependency>
To dig deeper, please look at the pom.xml
.
Skemium design brings together Debezium and Schema Registry codebases, using the Apache Avro codebase as lingua franca:
TableSchemaFetcher
extracts aList<io.debezium.relational.TableSchema>
, using the Debezium's RDBMS-specific connector source code to connect and query the DB schema- A
TableAvroSchemas
is created for eachio.debezium.relational.TableSchema
, invoking the provided methods that extract Key, Value and Envelope asorg.apache.kafka.connect.data.Schema
- Each
org.apache.kafka.connect.data.Schema
is converted toio.confluent.kafka.schemaregistry.avro.AvroSchema
via the provided constructor io.confluent.kafka.schemaregistry.CompatibilityChecker
is used to compare current and next versions ofio.confluent.kafka.schemaregistry.avro.AvroSchema
, applying the desiredio.confluent.kafka.schemaregistry.CompatibilityLevel
This gives us confidence that the generation of the Avro Schema, as well as the compatibility check, are the exact same that Debezium will apply in production.
Skemium can be compiled from source, but for convenience we release binaries:
skemium-${VER}-jar-with-dependencies
: an uber jar, easy to use in an environment where a JRE is presentskemium-${VER}-${OS}-${ARCH}
: native binary, generated via GraalVM (see below)
All binaries are generated when a new tag is pushed to the main
branch.
The generate
command connects to a Database, reads its Database Schema and coverts it to a CDC Avro Schema,
using Debezium Avro Serialization.
The output is saved in a user given output directory. The directory will contain:
- For each Table, a set of files following the naming structure
DB_NAME.DB_SCHEMA.DB_TABLE.EXTENSION
- Table Key schema file (
EXTENSION = .key.avsc
) - Table Value schema file (
EXTENSION = .val.avsc
) - Table Envelope schema file (
EXTENSION = .env.avsc
) - The checksum of all 3 schema files above (
EXTENSION = .sha256
)
- Table Key schema file (
- A metadata file named
.skemium.meta.json
(schema)
For example, if the database example
contains 2 tables user
and address
in the database schema public
, the output
directory will look like:
$ tree example_schma_dir/
example_schma_dir/
├── .skemium.meta.json
├── example.public.address.env.avsc
├── example.public.address.key.avsc
├── example.public.address.sha256
├── example.public.address.val.avsc
├── example.public.user.env.avsc
├── example.public.user.key.avsc
├── example.public.user.sha256
└── example.public.user.val.avsc
Run `skemium help generate` for usage instructions
$ skemium help generate
Generates Avro Schema from Tables in a Database
skemium generate [-v] -d=<dbName> -h=<hostname> [--kind=<kind>] -p=<port> --password=<password> -u=<username> [-s=<dbSchemas>[,
<dbSchemas>...]]... [-t=<dbTables>[,<dbTables>...]]... [-x=<dbExcludedColumns>[,<dbExcludedColumns>...]]... [DIRECTORY_PATH]
Description:
Connects to Database, finds schemas and tables,
converts table schemas to Avro Schemas, stores them in a directory.
Parameters:
[DIRECTORY_PATH] Output directory
Default: skemium-20250610-161400
Options:
-d, --database=<dbName> Database name (env: DB_NAME)
-h, --hostname=<hostname> Database hostname (env: DB_HOSTNAME)
--kind=<kind> Database kind (env: DB_KIND - optional)
Values: POSTGRES
Default: POSTGRES
-p, --port=<port> Database port (env: DB_PORT)
--password=<password> Database password (env: DB_PASSWORD)
-s, --schema=<dbSchemas>[,<dbSchemas>...]
Database schema(s); all if omitted (env: DB_SCHEMA - optional)
-t, --table=<dbTables>[,<dbTables>...]
Database table(s); all if omitted (fmt: DB_SCHEMA.DB_TABLE|DB_TABLE - env: DB_TABLE - optional)
-u, --username=<username> Database username (env: DB_USERNAME)
-v, --verbose Logging Verbosity - use multiple -v to increase (default: ERROR)
-x, --exclude-column=<dbExcludedColumns>[,<dbExcludedColumns>...]
Database table column(s) to exclude (fmt: DB_SCHEMA.DB_TABLE.DB_COLUMN - env: DB_EXCLUDED_COLUMN - optional)
generate
offers options to filter parts of the Database Schema:
-s | --schema
: only include selected schema(s) in the output- Format:
-s DB_SCHEMA
- Cardinality:
1..*
, either repeat option OR use comma (,
) separator
- Format:
-t | --table
: only include selected table(s) in the output- Format:
-t DB_TABLE
or-t DB_SCHEMA.DB_TABLE
- Cardinality:
1..*
, either repeat option OR use comma (,
) separator - Specificity:
DB_TABLE
: includes all tables of that name, across all selected schemasDB_SCHEMA.DB_TABLE
includes only a table of that name, in the specific schema
- Format:
-x | --exclude-column
: exclude selected column(s) from the output- Format:
-x DB_SCHEMA.DB_TABLE.DB_COLUMN
- Cardinality:
1..*
, either repeat option OR use comma (,
) separator
- Format:
Where:
DB_SCHEMA
is the name of a schema defined in the database (e.g.public
)DB_TABLE
is the name of a table in the databaseDB_COLUMN
is the name of a column in the database
The compare
command takes 2 directories (created via generate
) containing the CDC Avro Schema of a Database,
and compares them applying the given Schema Compatibility type.
The directories are identified as CURRENT
and NEXT
:
CURRENT
: CDC Avro Schema of a Database, generated at timeT
NEXT
: CDC Avro Schema of the Database, generated at timeT+1
compare
executes a table-by-table Schema Compatibility check, and reports on the result.
Exit Code will be 0
in case of success, 1
otherwise.
Additionally, compare
reports (via WARN
logging) if discrepancies of tables (additions/removals)
can be identified between CURRENT
and NEXT
.
The flag --ci-mode
can be used to force a failure in case of these discrepancies:
this is ideally used in the context of CI automations.
If necessary, the output of compare
can be stored in a output JSON file, using the --output
option (schema).
Run `skemium help compare` for usage instructions
$ skemium help compare
Compares Avro Schemas generated from Tables in a Database
skemium compare [-iv] [-c=<compatibilityLevel>] [-o=<output>] CURR_SCHEMAS_DIR NEXT_SCHEMAS_DIR
Description:
Given 2 directories (CURRENT / NEXT) containing Avro Schemas of Database Tables,
compares them according to Compatibility Level.
Parameters:
CURR_SCHEMAS_DIR Directory with the CURRENT Database Table schemas
NEXT_SCHEMAS_DIR Directory with the NEXT Database Table schemas
Options:
-c, --compatibility=<compatibilityLevel>
Compatibility Level (env: COMPATIBILITY - optional)
See: https://docs.confluent.io/platform/current/schema-registry/fundamentals/schema-evolution.html
Values: NONE, BACKWARD, BACKWARD_TRANSITIVE, FORWARD, FORWARD_TRANSITIVE, FULL, FULL_TRANSITIVE
Default: BACKWARD
-i, --ci, --ci-mode CI mode - Fail when a Table is only detected in one of the two directories (env: CI_MODE - optional)
Default: false
-o, --output=<output> Output file (JSON); overridden if exists (env: OUTPUT_FILE - optional)
-v, --verbose Logging Verbosity - use multiple -v to increase (default: ERROR)
The option -v | --verbose
(availabel for all commands) controls the logging verbosity.
By default, the logging level is ERROR
.
But it can be increased by passing one or more -v
options, to a maximum level of TRACE
. The mapping is:
<none> -> ERROR
-v -> WARN
-vv -> INFO
-vvv -> DEBUG
-vvvv -> TRACE
-vvvvv... -> TRACE
In a kinda meta twist, Skemium command outputs also have their own schemas.
The output of the command generate
includes the metadata file .skemium.meta.json
.
This summarises the output of the command, information about the moment in time, the local repository, and more.
The schema for this file is at <prj_root>/schemas/skemium.generate.meta.avsc.
The compare
command can optionally save the result to an --output
file.
This summarises what was compared, what Schema Compatibility was applied,
and what issues (if any) were identified by the comparison.
The schema for this file is at <prj_root>/schemas/skemium.compare.result.avsc.
Here are some major features that we haven't had time to tackle yet:
- Support for additional Databases (MySQL, MariaDB, MongoDB, Oracle, SQL Server, ...): currently only PostgreSQL is supported
- Support connecting to GCP CloudSQL databases via
dedicated
SocketFactory
- Support custom key definition for a table, similar to what
message.key.columns
allows when configuring Debezium. This is useful when an arbitrary key is desired or the table is missing a primary key. - Support for generating and comparing JSON Schema
- Support for generating and comparing Protobuf schemas
Of course, small contributions and bugfixes are also very welcome.
- Maven 3.9+
- JDK 21+
We recommend using asdf to setup your local development, as it makes it very easy to get set up:
asdf install
$ mvn clean package
$ mvn clean package assembly:single -DskipTests
Note
For this option, you need to use GraalVM. If you are using asdf, edit the .tools-versions
file and uncomment
the line to switch on oracle-graalvm
; then, asdf install
.
$ mvn clean package native:compile-no-fork -DskipTests
native-image
is what GraalVM uses to compiles Java code into native executables.
The default options used by native-image
are OK, but one determines the exact set of CPU features the compilation
happens against: -march
(documented in the native-image
build options).
Output of `-march=list` on `x86_64`
On AMD64, the following machine types are available:
'compatibility'
CPU features: all of 'x86-64'
'haswell'
CPU features: all of 'x86-64' + SSE3 + SSSE3 + SSE4_1 + SSE4_2 + POPCNT + LZCNT + AVX + AVX2 + AES + CLMUL + BMI1 + BMI2 + FMA
'native'
CPU features: CX8 + CMOV + FXSR + HT + MMX + AMD_3DNOW_PREFETCH + SSE + SSE2 + SSE3 + SSSE3 + SSE4A + SSE4_1 + SSE4_2 + POPCNT + LZCNT + TSC + TSCINV_BIT + AVX + AVX2 + AES + ERMS + CLMUL + BMI1 + BMI2 + ADX + SHA + FMA + VZEROUPPER + FLUSH + FLUSHOPT + HV + RDTSCP + RDPID + FSRM + F16C + CET_SS
'skylake'
CPU features: all of 'haswell' + AMD_3DNOW_PREFETCH + ADX + FLUSHOPT
'skylake-avx512'
CPU features: all of 'skylake' + AVX512F + AVX512DQ + AVX512CD + AVX512BW + AVX512VL + CLWB
'x86-64'
CPU features: CX8 + CMOV + FXSR + MMX + SSE + SSE2
'x86-64-v1'
CPU features: all of 'x86-64'
'x86-64-v2'
CPU features: all of 'x86-64-v1' + SSE3 + SSSE3 + SSE4_1 + SSE4_2 + POPCNT
'x86-64-v3'
CPU features: all of 'x86-64-v2' + LZCNT + AVX + AVX2 + BMI1 + BMI2 + FMA
'x86-64-v4'
CPU features: all of 'x86-64-v3' + AVX512F + AVX512DQ + AVX512CD + AVX512BW + AVX512VL
Output of `-march=list` on `aarch64_64`
On AArch64, the following machine types are available:
'armv8-a'
CPU features: FP + ASIMD
'armv8.1-a'
CPU features: all of 'armv8-a' + CRC32 + LSE
'compatibility'
CPU features: all of 'armv8-a'
'native'
CPU features: FP + ASIMD + EVTSTRM + AES + PMULL + SHA1 + SHA2 + CRC32 + LSE + DCPOP + SHA3 + SHA512 + SVE + PACA + SVEBITPERM + SVE2
The option also supports one or more feature modifiers via the form '-march=arch{+[no]feature}*'. Example: 'armv8.1-a+lse' enables Large System Extension instructions.
The following feature modifiers are available: 'aes', 'lse', 'fp', 'simd'.
After a bit of experimentation, we determined that -march=compatibility
was the best choice.
If you have taken advantage of the asdf setup (i.e. asdf install
), you have already installed https://taskfile.dev/.
Most frequently used tasks are already configured in the Taskfile. Give it a go!
As any open source tool, this builds on the shoulders of the great work of others (see the pom.xml).
But I want to especially thank 2 projects for the core of the functionality:
- Debezium, providing logic to extract database table schemas
- Confluent Schema Registry, providing logic to convert to/from Avro Schemas
Made with 💜 by Snyk