Skip to content

Commit aa3efed

Browse files
authored
Merge pull request #159 from kaleido-io/log-filter-fix
ensure only offsets are pulled after a filter is esstablished
2 parents b561f24 + f7312c7 commit aa3efed

File tree

3 files changed

+25
-2
lines changed

3 files changed

+25
-2
lines changed

internal/ethereum/event_actions_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ func mockStreamLoopEmpty(mRPC *rpcbackendmocks.Backend) {
8888
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
8989
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
9090
}).Maybe()
91+
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
92+
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
93+
}).Maybe()
9194
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
9295
*args[1].(*bool) = true
9396
}).Maybe()

internal/ethereum/event_stream.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ func (es *eventStream) leadGroupSteadyState() bool {
302302
lastUpdate := -1
303303
failCount := 0
304304
filterResetRequired := false
305+
filterRPCMethodToUse := ""
305306
for {
306307
if es.c.doFailureDelay(es.ctx, failCount) {
307308
log.L(es.ctx).Debugf("Stream loop exiting")
@@ -329,6 +330,7 @@ func (es *eventStream) leadGroupSteadyState() bool {
329330
es.uninstallFilter(&filter)
330331
}
331332
filterResetRequired = false
333+
filterRPCMethodToUse = "eth_getFilterLogs" // first JSON/RPC for a new filter ID fetches all the historical logs to ensure no gaps
332334
// Determine the earliest block we need to poll from
333335
fromBlock := int64(-1)
334336
for _, l := range ag.listeners {
@@ -362,17 +364,18 @@ func (es *eventStream) leadGroupSteadyState() bool {
362364
}
363365
// Get the next batch of logs
364366
var ethLogs []*logJSONRPC
365-
rpcErr := es.c.backend.CallRPC(es.ctx, &ethLogs, "eth_getFilterLogs", filter)
367+
rpcErr := es.c.backend.CallRPC(es.ctx, &ethLogs, filterRPCMethodToUse, filter)
366368
// If we fail to query we just retry - setting filter to nil if not found
367369
if rpcErr != nil {
368370
if mapError(filterRPCMethods, rpcErr.Error()) == ffcapi.ErrorReasonNotFound {
369371
log.L(es.ctx).Infof("Filter '%v' reset: %s", filter, rpcErr.Message)
370372
filter = ""
371373
}
372-
log.L(es.ctx).Errorf("Failed to query filter (eth_getFilterLogs): %s", rpcErr.Message)
374+
log.L(es.ctx).Errorf("Failed to query filter (%s): %s", filterRPCMethodToUse, rpcErr.Message)
373375
failCount++
374376
continue
375377
}
378+
filterRPCMethodToUse = "eth_getFilterChanges" // subsequent JSON/RPC calls after the initial fetch, this fetches only the new logs
376379
// Enrich the events
377380
events, enrichErr := es.filterEnrichSort(es.ctx, ag, ethLogs)
378381
if enrichErr != nil {

internal/ethereum/event_stream_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,11 @@ func TestLeadGroupDeliverEvents(t *testing.T) {
391391
Run(func(args mock.Arguments) {
392392
*args[1].(*string) = testLogsFilterID1
393393
}).Once()
394+
394395
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
396+
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
397+
}).Maybe()
398+
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
395399
*args[1].(*[]*logJSONRPC) = []*logJSONRPC{
396400
{
397401
BlockNumber: ethtypes.NewHexInteger64(212122),
@@ -408,6 +412,9 @@ func TestLeadGroupDeliverEvents(t *testing.T) {
408412
},
409413
}
410414
}).Once()
415+
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
416+
*args[1].(*[]*logJSONRPC) = []*logJSONRPC{}
417+
})
411418
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c", false).Return(nil).Run(func(args mock.Arguments) {
412419
*args[1].(**blockInfoJSONRPC) = &blockInfoJSONRPC{
413420
Number: ethtypes.NewHexInteger64(212122),
@@ -464,6 +471,7 @@ func TestLeadGroupNearBlockZeroEnsureNonNegative(t *testing.T) {
464471
}).Once().Run(func(args mock.Arguments) {
465472
close(filtered)
466473
})
474+
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Maybe()
467475
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
468476
*args[1].(*bool) = true
469477
}).Maybe()
@@ -648,6 +656,9 @@ func TestStreamLoopChangeFilter(t *testing.T) {
648656
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
649657
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
650658
}).Maybe()
659+
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
660+
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
661+
}).Maybe()
651662
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
652663
*args[1].(*bool) = true
653664
}).Maybe()
@@ -690,6 +701,9 @@ func TestStreamLoopFilterReset(t *testing.T) {
690701
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
691702
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
692703
}).Maybe()
704+
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
705+
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
706+
}).Maybe()
693707
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
694708
*args[1].(*bool) = true
695709
}).Maybe()
@@ -747,6 +761,9 @@ func TestStreamLoopEnrichFail(t *testing.T) {
747761
close(errorReturned)
748762
}).
749763
Return(&rpcbackend.RPCError{Message: "pop"}).Once()
764+
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
765+
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
766+
}).Maybe()
750767
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
751768
*args[1].(*bool) = true
752769
}).Maybe()

0 commit comments

Comments
 (0)