Skip to content

Commit e5bbb25

Browse files
Merge pull request #611 from LF-Decentralized-Trust-labs/event-listeners
Event listeners
2 parents fb5582e + 079a6a7 commit e5bbb25

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2044
-210
lines changed

.dockerignore

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ perf/
2525

2626
# Other folders not needed for the Docker build
2727
example/
28-
sdk/
2928

3029
# The operator has its own docker build (with its own .dockerignore), so we don't want to rebuild the whole
3130
# Paladin docker every time you're re-running the operator/test.

common/go/pkg/pldmsgs/en_descriptions.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,15 @@ var (
167167
TransactionReceiptFiltersDomain = pdm("TransactionReceiptFilters.domain", "Only deliver receipts for an individual domain (only valid with type=private)")
168168
TransactionReceiptOptionsDomainReceipts = pdm("TransactionReceiptOptions.domainReceipts", "When true, a full domain receipt will be generated for each event with complete state data")
169169
TransactionReceiptOptionsIncompleteStateReceiptBehavior = pdm("TransactionReceiptOptions.incompleteStateReceiptBehavior", "When set to 'block_contract', if a transaction with incomplete state data is detected then delivery of all receipts on that individual smart contract address will pause until the missing state arrives. Receipts for other contract addresses continue to be delivered")
170+
BlockchainEventListenerName = pdm("BlockchainEventListener.name", "Unique name for the blockchain event listener")
171+
BlockchainEventListenerCreated = pdm("BlockchainEventListener.created", "Time the listener was created")
172+
BlockchainEventListenerStarted = pdm("BlockchainEventListener.started", "If the listener is started - can be set to false to disable delivery server-side")
173+
BlockchainEventListenerSources = pdm("BlockchainEventListener.sources", "Sources of events")
174+
BlockchainEventListenerOptions = pdm("BlockchainEventListener.options", "Options for the event listener")
175+
BlockchainEventListenerOptionsBatchSize = pdm("BlockchainEventListenerOptions.batchSize", "The maximum number of events to deliver in each batch")
176+
BlockchainEventListenerOptionsBatchTimeout = pdm("BlockchainEventListenerOptions.batchTimeout", "The maximum time to wait for a batch to fill before delivering")
177+
BlockchainEventListenerSourceABI = pdm("BlockchainEventListenerSource.abi", "The ABI containing events to listen for")
178+
BlockchainEventListenerSourceAddress = pdm("BlockchainEventListenerSource.address", "The address to listen for events from")
170179
)
171180

172181
// query/query_json.go

core/go/componenttest/component_test.go

Lines changed: 83 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ import (
3030
"github.com/hyperledger/firefly-signer/pkg/abi"
3131
"github.com/kaleido-io/paladin/config/pkg/confutil"
3232
"github.com/kaleido-io/paladin/core/componenttest/domains"
33-
"github.com/kaleido-io/paladin/core/pkg/blockindexer"
34-
"github.com/kaleido-io/paladin/core/pkg/persistence"
3533

3634
"github.com/kaleido-io/paladin/sdk/go/pkg/pldapi"
3735
"github.com/kaleido-io/paladin/sdk/go/pkg/pldclient"
@@ -50,35 +48,12 @@ func TestRunSimpleStorageEthTransaction(t *testing.T) {
5048
ctx := context.Background()
5149
logrus.SetLevel(logrus.DebugLevel)
5250

53-
instance := newInstanceForComponentTesting(t, deployDomainRegistry(t), nil, nil, nil, false)
54-
cm := instance.cm
51+
instance := newInstanceForComponentTesting(t, deployDomainRegistry(t), nil, nil, nil, true)
5552
c := pldclient.Wrap(instance.client).ReceiptPollingInterval(250 * time.Millisecond)
5653

5754
build, err := solutils.LoadBuild(ctx, simpleStorageBuildJSON)
5855
require.NoError(t, err)
5956

60-
eventStreamEvents := make(chan *pldapi.EventWithData, 2 /* all the events we exepct */)
61-
_, err = cm.BlockIndexer().AddEventStream(ctx, cm.Persistence().NOTX(), &blockindexer.InternalEventStream{
62-
Handler: func(ctx context.Context, tx persistence.DBTX, batch *blockindexer.EventDeliveryBatch) error {
63-
// With SQLite we cannot hang in here with a DB TX - as there's only one per process.
64-
for _, e := range batch.Events {
65-
select {
66-
case eventStreamEvents <- e:
67-
default:
68-
assert.Fail(t, "more than expected number of events received")
69-
}
70-
}
71-
return nil
72-
},
73-
Definition: &blockindexer.EventStream{
74-
Name: "unittest",
75-
Sources: []blockindexer.EventStreamSource{{
76-
ABI: abi.ABI{build.ABI.Events()["Changed"]},
77-
}},
78-
},
79-
})
80-
require.NoError(t, err)
81-
8257
simpleStorage := c.ForABI(ctx, build.ABI).Public().From("key1")
8358

8459
res := simpleStorage.Clone().
@@ -89,6 +64,58 @@ func TestRunSimpleStorageEthTransaction(t *testing.T) {
8964
require.NoError(t, res.Error())
9065
contractAddr := res.Receipt().ContractAddress
9166

67+
// set up the event listener
68+
success, err := c.PTX().CreateBlockchainEventListener(ctx, &pldapi.BlockchainEventListener{
69+
Name: "listener1",
70+
Started: confutil.P(false),
71+
Sources: []pldapi.BlockchainEventListenerSource{{
72+
ABI: abi.ABI{build.ABI.Events()["Changed"]},
73+
Address: contractAddr,
74+
}},
75+
})
76+
require.NoError(t, err)
77+
require.True(t, success)
78+
79+
wsClient, err := c.WebSocket(ctx, instance.wsConfig)
80+
require.NoError(t, err)
81+
82+
sub, err := wsClient.PTX().SubscribeBlockchainEvents(ctx, "listener1")
83+
require.NoError(t, err)
84+
85+
eventData := make(chan string)
86+
87+
go func() {
88+
for {
89+
subNotification, ok := <-sub.Notifications()
90+
if ok {
91+
var batch pldapi.TransactionEventBatch
92+
_ = json.Unmarshal(subNotification.GetResult(), &batch)
93+
for _, e := range batch.Events {
94+
eventData <- e.Data.String()
95+
}
96+
}
97+
err = subNotification.Ack(ctx)
98+
require.NoError(t, err)
99+
}
100+
}()
101+
102+
// pause to make sure that if an event was going to be received, it would have been
103+
ticker1 := time.NewTicker(10 * time.Millisecond)
104+
defer ticker1.Stop()
105+
106+
select {
107+
case <-eventData:
108+
t.FailNow()
109+
case <-ticker1.C:
110+
}
111+
112+
success, err = c.PTX().StartBlockchainEventListener(ctx, "listener1")
113+
require.NoError(t, err)
114+
require.True(t, success)
115+
116+
data := <-eventData
117+
assert.JSONEq(t, `{"x":"11223344"}`, data)
118+
92119
var getX1 pldtypes.RawJSON
93120
err = simpleStorage.Clone().
94121
Function("get").
@@ -105,6 +132,9 @@ func TestRunSimpleStorageEthTransaction(t *testing.T) {
105132
Send().Wait(5 * time.Second)
106133
require.NoError(t, res.Error())
107134

135+
data = <-eventData
136+
assert.JSONEq(t, `{"x":"99887766"}`, data)
137+
108138
var getX2 pldtypes.RawJSON
109139
err = simpleStorage.Clone().
110140
Function("get").
@@ -114,12 +144,34 @@ func TestRunSimpleStorageEthTransaction(t *testing.T) {
114144
require.NoError(t, err)
115145
assert.JSONEq(t, `{"x":"99887766"}`, getX2.Pretty())
116146

117-
// Expect our event listener to be queued up with two Changed events
118-
event1 := <-eventStreamEvents
119-
assert.JSONEq(t, `{"x":"11223344"}`, string(event1.Data))
120-
event2 := <-eventStreamEvents
121-
assert.JSONEq(t, `{"x":"99887766"}`, string(event2.Data))
147+
// stop the event listener
148+
success, err = c.PTX().StopBlockchainEventListener(ctx, "listener1")
149+
require.NoError(t, err)
150+
require.True(t, success)
151+
152+
res = simpleStorage.Clone().
153+
Function("set").
154+
To(contractAddr).
155+
Inputs(`{"_x":1234}`).
156+
Send().Wait(5 * time.Second)
157+
require.NoError(t, res.Error())
158+
159+
// pause to make sure that if an event was going to be received, it would have been
160+
ticker2 := time.NewTicker(10 * time.Millisecond)
161+
defer ticker2.Stop()
162+
163+
select {
164+
case <-eventData:
165+
t.FailNow()
166+
case <-ticker2.C:
167+
}
168+
169+
success, err = c.PTX().StartBlockchainEventListener(ctx, "listener1")
170+
require.NoError(t, err)
171+
require.True(t, success)
122172

173+
data = <-eventData
174+
assert.JSONEq(t, `{"x":"1234"}`, data)
123175
}
124176

125177
func TestUpdatePublicTransaction(t *testing.T) {
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
BEGIN;
2+
3+
ALTER TABLE "event_streams" DROP COLUMN "started";
4+
5+
COMMIT;
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
BEGIN;
2+
3+
ALTER TABLE "event_streams" ADD "started" BOOLEAN DEFAULT TRUE;
4+
5+
COMMIT;
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TABLE "event_streams" DROP COLUMN "started";
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TABLE "event_streams" ADD "started" BOOLEAN DEFAULT TRUE;

core/go/internal/componentmgr/manager.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,9 @@ func (cm *componentManager) startBlockIndexer() (err error) {
315315
_, err = cm.blockIndexer.GetBlockListenerHeight(cm.bgCtx)
316316
err = cm.wrapIfErr(err, msgs.MsgComponentBlockIndexerStartError)
317317
}
318+
if err == nil {
319+
err = cm.txManager.LoadBlockchainEventListeners()
320+
}
318321
return err
319322
}
320323

core/go/internal/componentmgr/manager_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ func TestStartOK(t *testing.T) {
201201
mockTxManager := componentmocks.NewTXManager(t)
202202
mockTxManager.On("Start").Return(nil)
203203
mockTxManager.On("Stop").Return()
204+
mockTxManager.On("LoadBlockchainEventListeners").Return(nil)
204205

205206
mockGroupManager := componentmocks.NewGroupManager(t)
206207
mockGroupManager.On("Start").Return(nil)

core/go/internal/components/txmgr.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,11 @@ type ReceiptReceiver interface {
7777
DeliverReceiptBatch(ctx context.Context, batchID uint64, receipts []*pldapi.TransactionReceiptFull) error
7878
}
7979

80-
type ReceiptReceiverCloser interface {
80+
type BlockchainEventReceiver interface {
81+
DeliverBlockchainEventBatch(ctx context.Context, batchID uuid.UUID, events []*pldapi.EventWithData) error
82+
}
83+
84+
type ReceiverCloser interface {
8185
Close()
8286
}
8387

@@ -118,10 +122,11 @@ type TXManager interface {
118122
StartReceiptListener(ctx context.Context, name string) error
119123
StopReceiptListener(ctx context.Context, name string) error
120124
DeleteReceiptListener(ctx context.Context, name string) error
121-
AddReceiptReceiver(ctx context.Context, name string, r ReceiptReceiver) (ReceiptReceiverCloser, error)
125+
AddReceiptReceiver(ctx context.Context, name string, r ReceiptReceiver) (ReceiverCloser, error)
122126

123127
// These functions for use of other components
124128

129+
LoadBlockchainEventListeners() error
125130
NotifyStatesDBChanged(ctx context.Context) // called by state manager after committing DB TXs writing new states that might fill in gaps
126131
PrepareInternalPrivateTransaction(ctx context.Context, dbTX persistence.DBTX, tx *pldapi.TransactionInput, submitMode pldapi.SubmitMode) (*ValidatedTransaction, error)
127132
UpsertInternalPrivateTxsFinalizeIDs(ctx context.Context, dbTX persistence.DBTX, txis []*ValidatedTransaction) error

core/go/internal/domainmgr/domain.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,8 @@ func (d *domain) processDomainConfig(dbTX persistence.DBTX, confRes *prototk.Con
173173

174174
// Create the event stream
175175
d.eventStream, err = d.dm.blockIndexer.AddEventStream(d.ctx, dbTX, &blockindexer.InternalEventStream{
176-
Definition: stream,
177-
Handler: d.handleEventBatch,
176+
Definition: stream,
177+
HandlerDBTX: d.handleEventBatch,
178178
})
179179
if err != nil {
180180
return nil, err

core/go/internal/domainmgr/event_indexer_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,10 @@ func TestEventIndexingWithDB(t *testing.T) {
7878
Signature: eventSig_PaladinRegisterSmartContract_V0,
7979
},
8080
Data: pldtypes.RawJSON(`{
81-
"txId": "` + pldtypes.Bytes32UUIDFirst16(deployTX).String() + `",
82-
"instance": "` + contractAddr.String() + `",
83-
"config": "0xfeedbeef"
84-
}`),
81+
"txId": "` + pldtypes.Bytes32UUIDFirst16(deployTX).String() + `",
82+
"instance": "` + contractAddr.String() + `",
83+
"config": "0xfeedbeef"
84+
}`),
8585
},
8686
},
8787
})
@@ -143,8 +143,8 @@ func TestEventIndexingBadEvent(t *testing.T) {
143143
Address: *td.d.registryAddress,
144144
SoliditySignature: eventSolSig_PaladinRegisterSmartContract_V0,
145145
Data: pldtypes.RawJSON(`{
146-
"config": "cannot parse this"
147-
}`),
146+
"config": "cannot parse this"
147+
}`),
148148
},
149149
},
150150
})
@@ -185,10 +185,10 @@ func TestEventIndexingInsertError(t *testing.T) {
185185
Signature: eventSig_PaladinRegisterSmartContract_V0,
186186
},
187187
Data: pldtypes.RawJSON(`{
188-
"txId": "` + pldtypes.Bytes32UUIDFirst16(deployTX).String() + `",
189-
"domain": "` + contractAddr.String() + `",
190-
"data": "0xfeedbeef"
191-
}`),
188+
"txId": "` + pldtypes.Bytes32UUIDFirst16(deployTX).String() + `",
189+
"domain": "` + contractAddr.String() + `",
190+
"data": "0xfeedbeef"
191+
}`),
192192
},
193193
},
194194
})

0 commit comments

Comments
 (0)