Skip to content

Go application to capture inserts into PostgreSQL table and produce events to Apache Kafka using replication slot and WAL

Notifications You must be signed in to change notification settings

jaehoonkim/pg_listener

Β 
Β 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

3 Commits
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

ΠŸΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠ΅ для доставки сообщСний ΠΈΠ· PostgreSQL Π² Apache Kafka Ρ‡Π΅Ρ€Π΅Π· логичСскоС Π΄Π΅ΠΊΠΎΠ΄ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠ΅

Go application to capture inserts to PostgreSQL table and produce events to Apache Kafka using replication slot and WAL

БоСдиняСтся ΠΊ слоту Ρ€Π΅ΠΏΠ»ΠΈΠΊΠ°Ρ†ΠΈΠΈ ΠΈ ΡΠ»ΡƒΡˆΠ°Π΅Ρ‚ INSERT-Ρ‹ Π² ΡƒΠΊΠ°Π·Π°Π½Π½Ρ‹Π΅ Ρ‚Π°Π±Π»ΠΈΡ†Ρ‹, ΠΎΡ‚ΠΊΡƒΠ΄Π° ΠΊΠΎΠΏΠΈΡ€ΡƒΠ΅Ρ‚ Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ topic ΠΈ payload, ΠΏΠΎΡ‚ΠΎΠΌ отправляСт это сообщСниС Π² Kafka. ΠŸΡ€ΠΈΠΌΠ΅Ρ€ Ρ‚Π°Π±Π»ΠΈΡ†Ρ‹ для ΠΏΡ€ΠΎΡΠ»ΡƒΡˆΠΈΠ²Π°Π½ΠΈΡ:

CREATE TABLE queue.events (
    id bigserial primary key,
    added_at timestamp NOT NULL default clock_timestamp(),
    topic text NOT NULL,
    payload json
);

Установка

  • Ρ‚Ρ€Π΅Π±ΡƒΠ΅Ρ‚ установлСнный https://github.com/eulerto/wal2json Π½Π° сСрвСрС PostgreSQL
  • ΡΠΎΠ·Π΄Π°Ρ‚ΡŒ слот Ρ€Π΅ΠΏΠ»ΠΈΠΊΠ°Ρ†ΠΈΠΈ Π½Π° PostgreSQL SELECT pg_create_logical_replication_slot('my_slot','wal2json')
  • ΡƒΡΡ‚Π°Π½ΠΎΠ²ΠΈΡ‚ΡŒ Go 1.9 ΠΈΠ»ΠΈ Π²Ρ‹ΡˆΠ΅
  • ΡƒΡΡ‚Π°Π½ΠΎΠ²ΠΈΡ‚ΡŒ ΠΌΠ΅Π½Π΅Π΄ΠΆΠ΅Ρ€ ΠΏΠ°ΠΊΠ΅Ρ‚ΠΎΠ² https://github.com/golang/dep (Π½Π°ΠΏΡ€ΠΈΠΌΠ΅Ρ€: $ GOPATH="/home/user/go"; $ go get -u github.com/golang/dep/cmd/dep)
  • git clone %этот_Ρ€Π΅ΠΏΠΎΠ·ΠΈΡ‚ΠΎΡ€ΠΈΠΉ%
  • cd pg_listener
  • env GOPATH="%ΠΏΠΎΠ»Π½Ρ‹ΠΉ_ΠΏΡƒΡ‚ΡŒ_Π΄ΠΎ_ΠΏΠ°ΠΏΠΊΠΈ_рСпозитория%" go build pg_listener
  • Π˜Π›Π˜ GOPATH="%ΠΏΠΎΠ»Π½Ρ‹ΠΉ_ΠΏΡƒΡ‚ΡŒ_Π΄ΠΎ_ΠΏΠ°ΠΏΠΊΠΈ_рСпозитория%" CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo pg_listener для сборки статичСского Π±ΠΈΠ½Π°Ρ€Π½ΠΈΠΊΠ°

ΠŸΠΎΠ»ΡƒΡ‡Π°Π΅ΠΌ исполняСмый Ρ„Π°ΠΉΠ» pg_listener. Π—Π°ΠΏΡƒΡΠΊΠ°Ρ‚ΡŒ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡ Π² качСствС ΠΏΠ°Ρ€Π°ΠΌΠ΅Ρ‚Ρ€ΠΎΠ² ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½Ρ‹Π΅ срСды:

env PGLSN_DB_HOST=localhost PGLSN_DB_PORT=5432 PGLSN_DB_USER=postgres PGLSN_DB_PASS=postgres =demo PGLSN_TABLE_NAMES=queue.events PGLSN_SLOT=my_slot
PGLSN_KAFKA_HOSTS=kafka1.local:9092,kafka2.local:9092,kafka3.local:9092 PGLSN_CHUNKS=1 pg_listener

ΠšΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΡ

Π Π΅ΠΊΠ²ΠΈΠ·ΠΈΡ‚Ρ‹ PostgreSQL:
PGLSN_DB_HOST, PGLSN_DB_PORT, PGLSN_DB_NAME, PGLSN_DB_USER, PGLSN_DB_PASS

ΠžΠΏΡ†ΠΈΠΈ wal2json (https://github.com/eulerto/wal2json):
PGLSN_TABLE_NAMES - содСрТимоС ΠΎΠΏΡ†ΠΈΠΈ 'add-tables' - Π½Π°ΠΈΠΌΠ΅Π½ΠΎΠ²Π°Π½ΠΈΠ΅ Ρ‚Π°Π±Π»ΠΈΡ† Ρ‡Π΅Ρ€Π΅Π· "," INSERT-Ρ‹ Π½Π° ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Ρ…, ΠΏΠ΅Ρ€Π΅Π΄Π°ΡŽΡ‚ΡΡ Π² Kafka
PGLSN_CHUNKS - содСрТимоС ΠΎΠΏΡ†ΠΈΠΈ 'write-in-chunks', Ссли "1", Ρ‚ΠΎ ΠΏΠ°ΠΊΠ΅Ρ‚Π½Ρ‹Π΅ измСнСния ΠΏΠΎΠ»ΡƒΡ‡Π°Π΅Ρ‚, ΠΊΠ°ΠΊ построчныС

ΠžΠΏΡ†ΠΈΠΈ Ρ€Π΅ΠΏΠ»ΠΈΠΊΠ°Ρ†ΠΈΠΈ postgres (https://www.postgresql.org/docs/10/static/protocol-replication.html):
PGLSN_SLOT - слот Ρ€Π΅ΠΏΠ»ΠΈΠΊΠ°Ρ†ΠΈΠΈ ΠΎΡ‚ΠΊΡƒΠ΄Π° ΠΏΠΎΠ»ΡƒΡ‡Π°Ρ‚ΡŒ измСнСния
PGLSN_LSN - Π² Π²ΠΈΠ΄Π΅ строки XXX/XXX, начиная с этого WAL Π½Π°Ρ‡Π°Ρ‚ΡŒ Ρ€Π΅ΠΏΠ»ΠΈΠΊΠ°Ρ†ΠΈΡŽ. По-ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ пустая Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅, ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ΅ соотвСтствуСт 0/0.

Π Π΅ΠΊΠ²ΠΈΠ·ΠΈΡ‚Ρ‹ Apache Kafka:
PGLSN_KAFKA_HOSTS - строки hostname:port Ρ‡Π΅Ρ€Π΅Π· Π·Π°ΠΏΡΡ‚ΡƒΡŽ слитно

ΠŸΡ€ΠΈΠΌΠ΅Ρ‡Π°Π½ΠΈΠ΅

Если Π½Π΅ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ ΠΎΠΏΡ†ΠΈΡŽ write-in-chunks Π΅ΡΡ‚ΡŒ риск, Ρ‡Ρ‚ΠΎ большиС измСнСния Π½Π΅ смогут ΠΎΡ‚Ρ€Π΅ΠΏΠ»ΠΈΡ†ΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒΡΡ, Π΄Π°ΠΆΠ΅ Ссли ΠΎΠ½ΠΈ Π½Π΅ относятся ΠΊ отслСТиваСмой Ρ‚Π°Π±Π»ΠΈΡ†Π΅ (eulerto/wal2json#46)

About

Go application to capture inserts into PostgreSQL table and produce events to Apache Kafka using replication slot and WAL

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Go 100.0%