ΠΡΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠ΅ Π΄Π»Ρ Π΄ΠΎΡΡΠ°Π²ΠΊΠΈ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠΉ ΠΈΠ· PostgreSQL Π² Apache Kafka ΡΠ΅ΡΠ΅Π· Π»ΠΎΠ³ΠΈΡΠ΅ΡΠΊΠΎΠ΅ Π΄Π΅ΠΊΠΎΠ΄ΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅
Go application to capture inserts to PostgreSQL table and produce events to Apache Kafka using replication slot and WAL
Π‘ΠΎΠ΅Π΄ΠΈΠ½ΡΠ΅ΡΡΡ ΠΊ ΡΠ»ΠΎΡΡ ΡΠ΅ΠΏΠ»ΠΈΠΊΠ°ΡΠΈΠΈ ΠΈ ΡΠ»ΡΡΠ°Π΅Ρ INSERT-Ρ Π² ΡΠΊΠ°Π·Π°Π½Π½ΡΠ΅ ΡΠ°Π±Π»ΠΈΡΡ, ΠΎΡΠΊΡΠ΄Π° ΠΊΠΎΠΏΠΈΡΡΠ΅Ρ Π·Π½Π°ΡΠ΅Π½ΠΈΠ΅ ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ topic ΠΈ payload, ΠΏΠΎΡΠΎΠΌ ΠΎΡΠΏΡΠ°Π²Π»ΡΠ΅Ρ ΡΡΠΎ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅ Π² Kafka. ΠΡΠΈΠΌΠ΅Ρ ΡΠ°Π±Π»ΠΈΡΡ Π΄Π»Ρ ΠΏΡΠΎΡΠ»ΡΡΠΈΠ²Π°Π½ΠΈΡ:
CREATE TABLE queue.events (
id bigserial primary key,
added_at timestamp NOT NULL default clock_timestamp(),
topic text NOT NULL,
payload json
);
- ΡΡΠ΅Π±ΡΠ΅Ρ ΡΡΡΠ°Π½ΠΎΠ²Π»Π΅Π½Π½ΡΠΉ https://github.com/eulerto/wal2json Π½Π° ΡΠ΅ΡΠ²Π΅ΡΠ΅ PostgreSQL
- ΡΠΎΠ·Π΄Π°ΡΡ ΡΠ»ΠΎΡ ΡΠ΅ΠΏΠ»ΠΈΠΊΠ°ΡΠΈΠΈ Π½Π° PostgreSQL
SELECT pg_create_logical_replication_slot('my_slot','wal2json')
- ΡΡΡΠ°Π½ΠΎΠ²ΠΈΡΡ Go 1.9 ΠΈΠ»ΠΈ Π²ΡΡΠ΅
- ΡΡΡΠ°Π½ΠΎΠ²ΠΈΡΡ ΠΌΠ΅Π½Π΅Π΄ΠΆΠ΅Ρ ΠΏΠ°ΠΊΠ΅ΡΠΎΠ² https://github.com/golang/dep (Π½Π°ΠΏΡΠΈΠΌΠ΅Ρ:
$ GOPATH="/home/user/go"
;$ go get -u github.com/golang/dep/cmd/dep
) git clone %ΡΡΠΎΡ_ΡΠ΅ΠΏΠΎΠ·ΠΈΡΠΎΡΠΈΠΉ%
cd pg_listener
env GOPATH="%ΠΏΠΎΠ»Π½ΡΠΉ_ΠΏΡΡΡ_Π΄ΠΎ_ΠΏΠ°ΠΏΠΊΠΈ_ΡΠ΅ΠΏΠΎΠ·ΠΈΡΠΎΡΠΈΡ%" go build pg_listener
- ΠΠΠ
GOPATH="%ΠΏΠΎΠ»Π½ΡΠΉ_ΠΏΡΡΡ_Π΄ΠΎ_ΠΏΠ°ΠΏΠΊΠΈ_ΡΠ΅ΠΏΠΎΠ·ΠΈΡΠΎΡΠΈΡ%" CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo pg_listener
Π΄Π»Ρ ΡΠ±ΠΎΡΠΊΠΈ ΡΡΠ°ΡΠΈΡΠ΅ΡΠΊΠΎΠ³ΠΎ Π±ΠΈΠ½Π°ΡΠ½ΠΈΠΊΠ°
ΠΠΎΠ»ΡΡΠ°Π΅ΠΌ ΠΈΡΠΏΠΎΠ»Π½ΡΠ΅ΠΌΡΠΉ ΡΠ°ΠΉΠ» pg_listener. ΠΠ°ΠΏΡΡΠΊΠ°ΡΡ ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΡ Π² ΠΊΠ°ΡΠ΅ΡΡΠ²Π΅ ΠΏΠ°ΡΠ°ΠΌΠ΅ΡΡΠΎΠ² ΠΏΠ΅ΡΠ΅ΠΌΠ΅Π½Π½ΡΠ΅ ΡΡΠ΅Π΄Ρ:
env PGLSN_DB_HOST=localhost PGLSN_DB_PORT=5432 PGLSN_DB_USER=postgres PGLSN_DB_PASS=postgres =demo PGLSN_TABLE_NAMES=queue.events PGLSN_SLOT=my_slot
PGLSN_KAFKA_HOSTS=kafka1.local:9092,kafka2.local:9092,kafka3.local:9092 PGLSN_CHUNKS=1 pg_listener
Π Π΅ΠΊΠ²ΠΈΠ·ΠΈΡΡ PostgreSQL:
PGLSN_DB_HOST, PGLSN_DB_PORT, PGLSN_DB_NAME, PGLSN_DB_USER, PGLSN_DB_PASS
ΠΠΏΡΠΈΠΈ wal2json (https://github.com/eulerto/wal2json):
PGLSN_TABLE_NAMES - ΡΠΎΠ΄Π΅ΡΠΆΠΈΠΌΠΎΠ΅ ΠΎΠΏΡΠΈΠΈ 'add-tables' - Π½Π°ΠΈΠΌΠ΅Π½ΠΎΠ²Π°Π½ΠΈΠ΅ ΡΠ°Π±Π»ΠΈΡ ΡΠ΅ΡΠ΅Π· "," INSERT-Ρ Π½Π° ΠΊΠΎΡΠΎΡΡΡ
, ΠΏΠ΅ΡΠ΅Π΄Π°ΡΡΡΡ Π² Kafka
PGLSN_CHUNKS - ΡΠΎΠ΄Π΅ΡΠΆΠΈΠΌΠΎΠ΅ ΠΎΠΏΡΠΈΠΈ 'write-in-chunks', Π΅ΡΠ»ΠΈ "1", ΡΠΎ ΠΏΠ°ΠΊΠ΅ΡΠ½ΡΠ΅ ΠΈΠ·ΠΌΠ΅Π½Π΅Π½ΠΈΡ ΠΏΠΎΠ»ΡΡΠ°Π΅Ρ, ΠΊΠ°ΠΊ ΠΏΠΎΡΡΡΠΎΡΠ½ΡΠ΅
ΠΠΏΡΠΈΠΈ ΡΠ΅ΠΏΠ»ΠΈΠΊΠ°ΡΠΈΠΈ postgres (https://www.postgresql.org/docs/10/static/protocol-replication.html):
PGLSN_SLOT - ΡΠ»ΠΎΡ ΡΠ΅ΠΏΠ»ΠΈΠΊΠ°ΡΠΈΠΈ ΠΎΡΠΊΡΠ΄Π° ΠΏΠΎΠ»ΡΡΠ°ΡΡ ΠΈΠ·ΠΌΠ΅Π½Π΅Π½ΠΈΡ
PGLSN_LSN - Π² Π²ΠΈΠ΄Π΅ ΡΡΡΠΎΠΊΠΈ XXX/XXX, Π½Π°ΡΠΈΠ½Π°Ρ Ρ ΡΡΠΎΠ³ΠΎ WAL Π½Π°ΡΠ°ΡΡ ΡΠ΅ΠΏΠ»ΠΈΠΊΠ°ΡΠΈΡ. ΠΠΎ-ΡΠΌΠΎΠ»ΡΠ°Π½ΠΈΡ ΠΏΡΡΡΠ°Ρ Π·Π½Π°ΡΠ΅Π½ΠΈΠ΅, ΠΊΠΎΡΠΎΡΠΎΠ΅ ΡΠΎΠΎΡΠ²Π΅ΡΡΡΠ²ΡΠ΅Ρ 0/0.
Π Π΅ΠΊΠ²ΠΈΠ·ΠΈΡΡ Apache Kafka:
PGLSN_KAFKA_HOSTS - ΡΡΡΠΎΠΊΠΈ hostname:port ΡΠ΅ΡΠ΅Π· Π·Π°ΠΏΡΡΡΡ ΡΠ»ΠΈΡΠ½ΠΎ
ΠΡΠ»ΠΈ Π½Π΅ ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°ΡΡ ΠΎΠΏΡΠΈΡ write-in-chunks
Π΅ΡΡΡ ΡΠΈΡΠΊ, ΡΡΠΎ Π±ΠΎΠ»ΡΡΠΈΠ΅ ΠΈΠ·ΠΌΠ΅Π½Π΅Π½ΠΈΡ Π½Π΅ ΡΠΌΠΎΠ³ΡΡ ΠΎΡΡΠ΅ΠΏΠ»ΠΈΡΠΈΡΠΎΠ²Π°ΡΡΡΡ, Π΄Π°ΠΆΠ΅ Π΅ΡΠ»ΠΈ ΠΎΠ½ΠΈ Π½Π΅ ΠΎΡΠ½ΠΎΡΡΡΡΡ ΠΊ ΠΎΡΡΠ»Π΅ΠΆΠΈΠ²Π°Π΅ΠΌΠΎΠΉ ΡΠ°Π±Π»ΠΈΡΠ΅ (eulerto/wal2json#46)