Skip to content

Commit c50f53f

Browse files
committed
add --cursor-table and --history-table to allow multiple concurrent sinks on the same database
1 parent 0833ac4 commit c50f53f

File tree

6 files changed

+38
-26
lines changed

6 files changed

+38
-26
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## v4.2.0
9+
10+
* Added the --cursor-table and --history-table flags to allow running to sinks on the same database (be careful that you have no collision in table names)
11+
* bumped substreams to v1.8.2, add some default network endpoints
12+
813
## v4.1.0
914

1015
* Bumped substreams to v1.7.3

cmd/substreams-sink-sql/main.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
. "github.com/streamingfast/cli"
1212
"github.com/streamingfast/cli/sflags"
1313
"github.com/streamingfast/dmetrics"
14+
"github.com/streamingfast/substreams-sink-sql/db"
1415
"go.uber.org/zap"
1516
)
1617

@@ -33,6 +34,8 @@ func main() {
3334
flags.Duration("delay-before-start", 0, "[Operator] Amount of time to wait before starting any internal processes, can be used to perform to maintenance on the pod before actually letting it starts")
3435
flags.String("metrics-listen-addr", "localhost:9102", "[Operator] If non-empty, the process will listen on this address for Prometheus metrics request(s)")
3536
flags.String("pprof-listen-addr", "localhost:6060", "[Operator] If non-empty, the process will listen on this address for pprof analysis (see https://golang.org/pkg/net/http/pprof/)")
37+
flags.String("cursors-table", "cursors", "[Operator] Name of the table to use for storing cursors")
38+
flags.String("history-table", "substreams_history", "[Operator] Name of the table to use for storing block history, used to handle reorgs")
3639
}),
3740
AfterAllHook(func(cmd *cobra.Command) {
3841
cmd.PersistentPreRun = preStart
@@ -41,6 +44,10 @@ func main() {
4144
}
4245

4346
func preStart(cmd *cobra.Command, _ []string) {
47+
48+
db.CURSORS_TABLE = sflags.MustGetString(cmd, "cursors-table")
49+
db.HISTORY_TABLE = sflags.MustGetString(cmd, "history-table")
50+
4451
delay := sflags.MustGetDuration(cmd, "delay-before-start")
4552
if delay > 0 {
4653
zlog.Info("sleeping to respect delay before start setting", zap.Duration("delay", delay))

db/db.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import (
1313
"go.uber.org/zap/zapcore"
1414
)
1515

16-
const CURSORS_TABLE = "cursors"
17-
const HISTORY_TABLE = "substreams_history"
16+
var CURSORS_TABLE = "cursors"
17+
var HISTORY_TABLE = "substreams_history"
1818

1919
// Make the typing a bit easier
2020
type OrderedMap[K comparable, V any] struct {

db/dialect_postgres.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,12 +187,12 @@ func (d postgresDialect) GetCreateCursorQuery(schema string, withPostgraphile bo
187187
out := fmt.Sprintf(cli.Dedent(`
188188
create table if not exists %s.%s
189189
(
190-
id text not null constraint cursor_pk primary key,
190+
id text not null constraint %s primary key,
191191
cursor text,
192192
block_num bigint,
193193
block_id text
194194
);
195-
`), EscapeIdentifier(schema), EscapeIdentifier(CURSORS_TABLE))
195+
`), EscapeIdentifier(schema), EscapeIdentifier(CURSORS_TABLE), EscapeIdentifier(CURSORS_TABLE+"_pk"))
196196
if withPostgraphile {
197197
out += fmt.Sprintf("COMMENT ON TABLE %s.%s IS E'@omit';",
198198
EscapeIdentifier(schema), EscapeIdentifier(CURSORS_TABLE))
@@ -270,7 +270,7 @@ func (d postgresDialect) CreateUser(tx Tx, ctx context.Context, l *Loader, usern
270270
}
271271

272272
func (d postgresDialect) historyTable(schema string) string {
273-
return fmt.Sprintf("%s.%s", EscapeIdentifier(schema), EscapeIdentifier("substreams_history"))
273+
return fmt.Sprintf("%s.%s", EscapeIdentifier(schema), EscapeIdentifier(HISTORY_TABLE))
274274
}
275275

276276
func (d postgresDialect) saveInsert(schema string, table string, primaryKey map[string]string, blockNum uint64) string {

go.mod

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/spf13/pflag v1.0.5
1414
github.com/spf13/viper v1.15.0
1515
github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091
16-
github.com/streamingfast/substreams v1.7.3
16+
github.com/streamingfast/substreams v1.8.3-0.20240628134338-5c9843eeef98
1717
github.com/streamingfast/substreams-sink v0.4.0
1818
github.com/streamingfast/substreams-sink-database-changes v1.1.3
1919
github.com/stretchr/testify v1.9.0
@@ -99,13 +99,13 @@ require (
9999
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 // indirect
100100
github.com/subosito/gotenv v1.4.2 // indirect
101101
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf // indirect
102-
github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c // indirect
102+
github.com/whyrusleeping/tar-utils v0.0.0-20201201191210-20a61371de5b // indirect
103103
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
104104
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
105105
go.opentelemetry.io/otel v1.26.0 // indirect
106106
go.opentelemetry.io/otel/metric v1.26.0 // indirect
107107
go.opentelemetry.io/otel/trace v1.26.0 // indirect
108-
golang.org/x/sync v0.6.0 // indirect
108+
golang.org/x/sync v0.7.0 // indirect
109109
golang.org/x/time v0.5.0 // indirect
110110
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect
111111
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
@@ -159,13 +159,13 @@ require (
159159
go.opencensus.io v0.24.0 // indirect
160160
go.uber.org/atomic v1.11.0 // indirect
161161
go.uber.org/multierr v1.11.0 // indirect
162-
golang.org/x/crypto v0.21.0 // indirect
162+
golang.org/x/crypto v0.23.0 // indirect
163163
golang.org/x/mod v0.13.0 // indirect
164164
golang.org/x/net v0.22.0 // indirect
165165
golang.org/x/oauth2 v0.18.0 // indirect
166-
golang.org/x/sys v0.18.0 // indirect
167-
golang.org/x/term v0.18.0 // indirect
168-
golang.org/x/text v0.14.0 // indirect
166+
golang.org/x/sys v0.20.0 // indirect
167+
golang.org/x/term v0.20.0 // indirect
168+
golang.org/x/text v0.15.0 // indirect
169169
google.golang.org/api v0.172.0 // indirect
170170
google.golang.org/appengine v1.6.8 // indirect
171171
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect

go.sum

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -534,8 +534,8 @@ github.com/streamingfast/schema v0.0.0-20240621180609-1de2e05fe3bd h1:P96NMUr1jD
534534
github.com/streamingfast/schema v0.0.0-20240621180609-1de2e05fe3bd/go.mod h1:XuHkKh98QevgA9M3oWB5Y5Tm6w7iNJ5P3a3ao7UnnfI=
535535
github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAtyaTOgs=
536536
github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8=
537-
github.com/streamingfast/substreams v1.7.3 h1:V4YOJt4tAzivaLpC7nSVDdULMuczj7QfJUTyHAT2qkM=
538-
github.com/streamingfast/substreams v1.7.3/go.mod h1:ao5xpjglDohCmwzRUlJtTNGUn+nXGR2AXBnYQEbvQaI=
537+
github.com/streamingfast/substreams v1.8.3-0.20240628134338-5c9843eeef98 h1:fDo2+v07xEbR0J0sRe8m/W7pnPDYq7dFIAdcHElMNDg=
538+
github.com/streamingfast/substreams v1.8.3-0.20240628134338-5c9843eeef98/go.mod h1:XtL4RgQawes9/a9iM9d6bAABacfIuekY+jceszF7u2c=
539539
github.com/streamingfast/substreams-sink v0.4.0 h1:sU/E9Q4zXTfKxaGUgQ4OMFC+/NBH5VdZx6SOcBVp7P0=
540540
github.com/streamingfast/substreams-sink v0.4.0/go.mod h1:wlF6pAQTBXQGA9k5R1yKg6enHdtXjz1pMlAs4lhZOac=
541541
github.com/streamingfast/substreams-sink-database-changes v1.1.3 h1:rXeGb/V2mjC8FftumRkMQxG2jtdLfHdLx9UQVUtAqS8=
@@ -564,8 +564,8 @@ github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE
564564
github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmNQzk2ghU=
565565
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
566566
github.com/tsenart/deadcode v0.0.0-20160724212837-210d2dc333e9/go.mod h1:q+QjxYvZ+fpjMXqs+XEriussHjSYqeXVnAdSV1tkMYk=
567-
github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c h1:GGsyl0dZ2jJgVT+VvWBf/cNijrHRhkrTjkmp5wg7li0=
568-
github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c/go.mod h1:xxcJeBb7SIUl/Wzkz1eVKJE/CB34YNrqX2TQI6jY9zs=
567+
github.com/whyrusleeping/tar-utils v0.0.0-20201201191210-20a61371de5b h1:wA3QeTsaAXybLL2kb2cKhCAQTHgYTMwuI8lBlJSv5V8=
568+
github.com/whyrusleeping/tar-utils v0.0.0-20201201191210-20a61371de5b/go.mod h1:xT1Y5p2JR2PfSZihE0s4mjdJaRGp1waCTf5JzhQLBck=
569569
github.com/wk8/go-ordered-map/v2 v2.1.7 h1:aUZ1xBMdbvY8wnNt77qqo4nyT3y0pX4Usat48Vm+hik=
570570
github.com/wk8/go-ordered-map/v2 v2.1.7/go.mod h1:9Xvgm2mV2kSq2SAm0Y608tBmu8akTzI7c2bz7/G7ZN4=
571571
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
@@ -645,8 +645,8 @@ golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0
645645
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
646646
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
647647
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
648-
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
649-
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
648+
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
649+
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
650650
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
651651
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
652652
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -750,8 +750,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
750750
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
751751
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
752752
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
753-
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
754-
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
753+
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
754+
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
755755
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
756756
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
757757
golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -809,15 +809,15 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
809809
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
810810
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
811811
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
812-
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
813-
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
812+
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
813+
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
814814
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
815815
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
816816
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
817817
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
818818
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
819-
golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
820-
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
819+
golang.org/x/term v0.20.0 h1:VnkxpohqXaOBYJtBmEppKUG6mXpi+4O6purfc2+sMhw=
820+
golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
821821
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
822822
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
823823
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -829,8 +829,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
829829
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
830830
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
831831
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
832-
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
833-
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
832+
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
833+
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
834834
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
835835
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
836836
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=

0 commit comments

Comments
 (0)