Skip to content

xataio/pgstream

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

pgstream logo

License - Apache 2.0  CI Build   Go Reference  Release   Downloads   Go Report Card   Discord   X (formerly Twitter) Follow

pgstream - Postgres replication with DDL changes

pgstream is an open source CDC command-line tool and library that offers Postgres replication support with DDL changes to any provided target.

pg2pg demo with transformers

Features

  • Schema change tracking and replication of DDL changes
  • Support for multiple out of the box targets
    • Elasticsearch/OpenSearch
    • Webhooks
    • PostgreSQL
  • Initial and on demand PostgreSQL snapshots (for when you don't need continuous replication)
  • Column value transformations (anonymise your data on the go!)
  • Modular deployment configuration, only requires Postgres
  • Kafka support with schema based partitioning
  • Extendable support for custom targets

Table of Contents

Usage

pgstream can be used via the readily available CLI or as a library.

CLI Installation

Binaries

Binaries are available for Linux, macOS & Windows, check our Releases.

From source

To install pgstream from the source, run the following command:

go install github.com/xataio/pgstream@latest

From package manager - Homebrew

To install pgstream with homebrew, run the following command:

# macOS or Linux
brew tap xataio/pgstream
brew install pgstream

Environment setup

If you have an environment available, with at least Postgres and whichever module resources you're planning on running, then you can skip this step. Otherwise, a docker setup is available in this repository that starts Postgres, Kafka and OpenSearch (as well as OpenSearch dashboards for easy visualisation).

docker-compose -f build/docker/docker-compose.yml up

The docker-compose file has profiles that can be used in order to bring up only the relevant containers. If for example you only want to run PostgreSQL to PostgreSQL pgstream replication you can use the pg2pg profile as follows:

docker-compose -f build/docker/docker-compose.yml --profile pg2pg up

You can also run multiple profiles. For example to start two PostgreSQL instances and Kafka:

docker-compose -f build/docker/docker-compose.yml --profile pg2pg --profile kafka up

List of supported docker profiles:

  • pg2pg
  • pg2os
  • pg2webhook
  • kafka

Configuration

Pgstream source and target need to be configured appropriately before the commands can be run. This can be done:

  • Using the relevant CLI flags for each command
  • Using a yaml configuration file
  • Using environment variables (.env file supported)

Check the documentation for more information about the configuration options, or check the help on the CLI for details on the available flags. Additionally, at the root of this repository you can find sample files for both .env and .yaml.

Prepare the database

This will create the pgstream schema in the configured Postgres database, along with the tables/functions/triggers required to keep track of the schema changes. See Tracking schema changes section for more details. It will also create a replication slot for the configured database which will be used by the pgstream service. If no replication slot name is provided, it will use a default one with the format pgstream_<database>_slot.

# with CLI flags
pgstream init --postgres-url "postgres://postgres:postgres@localhost?sslmode=disable" --replication-slot test
# with yaml configuration file
pgstream init -c pg2pg.yaml
# with environment configuration file
pgstream init -c pg2pg.env

The status of the initalisation and the configuration can be checked by using the status command.

pgstream status -c pg2pg.yaml
SUCCESS  pgstream status check encountered no issues
Initialisation status:
 - Pgstream schema exists: true
 - Pgstream schema_log table exists: true
 - Migration current version: 7
 - Migration status: success
 - Replication slot name: pgstream_postgres_slot
 - Replication slot plugin: wal2json
 - Replication slot database: postgres
Config status:
 - Valid: true
Transformation rules status:
 - Valid: true
Source status:
 - Reachable: true

If there are any issues or if you want to revert the pgstream setup, you can use the tear-down command to clean up all pgstream state.

pgstream tear-down --postgres-url "postgres://postgres:postgres@localhost?sslmode=disable" --replication-slot test
# with yaml configuration file
pgstream tear-down -c pg2pg.yaml
# with environment configuration file
pgstream tear-down -c pg2pg.env

Run pgstream

Replication mode

Run will start streaming data from the configured source into the configured target.

Example running pgstream replication from Postgres -> OpenSearch:

# using the environment configuration file
pgstream run -c pg2os.env --log-level trace
# using the yaml configuration file
pgstream run -c pg2os.yaml --log-level info
# using the CLI flags
pgstream run --source postgres --source-url "postgres://postgres:postgres@localhost:5432?sslmode=disable" --target opensearch --target-url "http://admin:admin@localhost:9200"

Example running pgstream with Postgres -> Kafka, and in a separate terminal, Kafka->OpenSearch:

# using the environment configuration file
pgstream run -c pg2kafka.env --log-level trace
# using the yaml configuration file
pgstream run -c pg2kafka.yaml --log-level info
# using the CLI flags
pgstream run --source postgres --source-url "postgres://postgres:postgres@localhost:5432?sslmode=disable" --target kafka --target-url "localhost:9092"
# using the environment configuration file
pgstream run -c kafka2os.env --log-level trace
# using the yaml configuration file
pgstream run -c kafka2os.yaml --log-level info
# using the CLI flags
pgstream run --source kafka --source-url "localhost:9092" --target opensearch --target-url "http://admin:admin@localhost:9200"

Example running pgstream with PostgreSQL -> PostgreSQL with initial snapshot enabled:

# using the environment configuration file
pgstream run -c pg2pg.env --log-level trace
# using the yaml configuration file
pgstream run -c pg2pg.yaml --log-level info
# using the CLI flags
pgstream run --source postgres --source-url "postgres://postgres:postgres@localhost:5432?sslmode=disable" --target postgres --target-url "postgres://postgres:postgres@localhost:7654?sslmode=disable" --snapshot-tables test

Snapshot mode

Example running pgstream to perform a snapshot from PostgreSQL -> PostgreSQL:

# using the environment configuration file
pgstream snapshot -c snapshot2pg.env --log-level trace
# using the yaml configuration file
pgstream snapshot -c snapshot2pg.yaml --log-level info
# using the CLI flags
pgstream snapshot --postgres-url="postgres://postgres:postgres@localhost:5432?sslmode=disable" --target=postgres --target-url="postgres://postgres:postgres@localhost:7654?sslmode=disable" --tables="test" --reset

Pgstream will parse the configuration provided, and initialise the relevant modules. It requires at least one source(listener) and one target(processor).

Tutorials

Documentation

For more advanced usage, implementation details, and detailed configuration settings, please refer to the full Documentation.

Limitations

Some of the limitations of the initial release include:

  • Single Kafka topic support
  • Postgres plugin support limited to wal2json
  • No row level filtering support
  • Primary key/unique not null column required for replication
  • Kafka serialisation support limited to JSON

Contributing

We welcome contributions from the community! If you'd like to contribute to pgstream, please follow these guidelines:

  • Create an issue for any questions, bug reports, or feature requests.
  • Check the documentation and existing issues before opening a new issue.

Contributing Code

  1. Fork the repository.
  2. Create a new branch for your feature or bug fix.
  3. Make your changes and write tests if applicable.
  4. Ensure your code passes linting and tests.
    • There's a pre-commit configuration available on the root directory (.pre-commit-config.yaml), which can be used to validate some of the correctness CI checks locally.
    • Use make test and make integration-test to validate unit and integration tests pass locally.
    • Use make generate to ensure the generated files are up to date.
  5. Submit a pull request.

For this project, we pledge to act and interact in ways that contribute to an open, welcoming, diverse, inclusive, and healthy community.

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

Support

If you have any questions, encounter issues, or need assistance, open an issue in this repository our join our Discord, and our community will be happy to help.


Made with ❤️ by Xata 🦋