Skip to content

Commit db9a178

Browse files
jsvisas1naholiman
authored
eth/filters: retrieve logs in async (ethereum#27135)
This change implements async log retrievals via feeding logs in channels, instead of returning slices. This is a first step to implement ethereum#15063. --------- Signed-off-by: jsvisa <delweng@gmail.com> Co-authored-by: Sina Mahmoodi <itz.s1na@gmail.com> Co-authored-by: Martin Holst Swende <martin@swende.se> Co-authored-by: Sina Mahmoodi <1591639+s1na@users.noreply.github.com>
1 parent 9358b62 commit db9a178

File tree

3 files changed

+320
-153
lines changed

3 files changed

+320
-153
lines changed

eth/filters/filter.go

Lines changed: 103 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -106,32 +106,32 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
106106
}
107107
return f.blockLogs(ctx, header)
108108
}
109-
// Short-cut if all we care about is pending logs
110-
if f.begin == rpc.PendingBlockNumber.Int64() {
111-
if f.end != rpc.PendingBlockNumber.Int64() {
112-
return nil, errors.New("invalid block range")
113-
}
114-
return f.pendingLogs()
115-
}
116-
// Figure out the limits of the filter range
117-
header, _ := f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
118-
if header == nil {
119-
return nil, nil
120-
}
109+
121110
var (
122-
err error
123-
head = header.Number.Int64()
124-
pending = f.end == rpc.PendingBlockNumber.Int64()
111+
beginPending = f.begin == rpc.PendingBlockNumber.Int64()
112+
endPending = f.end == rpc.PendingBlockNumber.Int64()
125113
)
114+
115+
// special case for pending logs
116+
if beginPending && !endPending {
117+
return nil, errors.New("invalid block range")
118+
}
119+
120+
// Short-cut if all we care about is pending logs
121+
if beginPending && endPending {
122+
return f.pendingLogs(), nil
123+
}
124+
126125
resolveSpecial := func(number int64) (int64, error) {
127126
var hdr *types.Header
128127
switch number {
129-
case rpc.LatestBlockNumber.Int64():
130-
return head, nil
131-
case rpc.PendingBlockNumber.Int64():
128+
case rpc.LatestBlockNumber.Int64(), rpc.PendingBlockNumber.Int64():
132129
// we should return head here since we've already captured
133130
// that we need to get the pending logs in the pending boolean above
134-
return head, nil
131+
hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
132+
if hdr == nil {
133+
return 0, errors.New("latest header not found")
134+
}
135135
case rpc.FinalizedBlockNumber.Int64():
136136
hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.FinalizedBlockNumber)
137137
if hdr == nil {
@@ -147,57 +147,92 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
147147
}
148148
return hdr.Number.Int64(), nil
149149
}
150+
151+
var err error
152+
// range query need to resolve the special begin/end block number
150153
if f.begin, err = resolveSpecial(f.begin); err != nil {
151154
return nil, err
152155
}
153156
if f.end, err = resolveSpecial(f.end); err != nil {
154157
return nil, err
155158
}
156-
// Gather all indexed logs, and finish with non indexed ones
159+
160+
logChan, errChan := f.rangeLogsAsync(ctx)
161+
var logs []*types.Log
162+
for {
163+
select {
164+
case log := <-logChan:
165+
logs = append(logs, log)
166+
case err := <-errChan:
167+
if err != nil {
168+
// if an error occurs during extraction, we do return the extracted data
169+
return logs, err
170+
}
171+
// Append the pending ones
172+
if endPending {
173+
pendingLogs := f.pendingLogs()
174+
logs = append(logs, pendingLogs...)
175+
}
176+
return logs, nil
177+
}
178+
}
179+
}
180+
181+
// rangeLogsAsync retrieves block-range logs that match the filter criteria asynchronously,
182+
// it creates and returns two channels: one for delivering log data, and one for reporting errors.
183+
func (f *Filter) rangeLogsAsync(ctx context.Context) (chan *types.Log, chan error) {
157184
var (
158-
logs []*types.Log
159-
end = uint64(f.end)
160-
size, sections = f.sys.backend.BloomStatus()
185+
logChan = make(chan *types.Log)
186+
errChan = make(chan error)
161187
)
162-
if indexed := sections * size; indexed > uint64(f.begin) {
163-
if indexed > end {
164-
logs, err = f.indexedLogs(ctx, end)
165-
} else {
166-
logs, err = f.indexedLogs(ctx, indexed-1)
167-
}
168-
if err != nil {
169-
return logs, err
188+
189+
go func() {
190+
defer func() {
191+
close(errChan)
192+
close(logChan)
193+
}()
194+
195+
// Gather all indexed logs, and finish with non indexed ones
196+
var (
197+
end = uint64(f.end)
198+
size, sections = f.sys.backend.BloomStatus()
199+
err error
200+
)
201+
if indexed := sections * size; indexed > uint64(f.begin) {
202+
if indexed > end {
203+
indexed = end + 1
204+
}
205+
if err = f.indexedLogs(ctx, indexed-1, logChan); err != nil {
206+
errChan <- err
207+
return
208+
}
170209
}
171-
}
172-
rest, err := f.unindexedLogs(ctx, end)
173-
logs = append(logs, rest...)
174-
if pending {
175-
pendingLogs, err := f.pendingLogs()
176-
if err != nil {
177-
return nil, err
210+
211+
if err := f.unindexedLogs(ctx, end, logChan); err != nil {
212+
errChan <- err
213+
return
178214
}
179-
logs = append(logs, pendingLogs...)
180-
}
181-
return logs, err
215+
216+
errChan <- nil
217+
}()
218+
219+
return logChan, errChan
182220
}
183221

184222
// indexedLogs returns the logs matching the filter criteria based on the bloom
185223
// bits indexed available locally or via the network.
186-
func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
224+
func (f *Filter) indexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error {
187225
// Create a matcher session and request servicing from the backend
188226
matches := make(chan uint64, 64)
189227

190228
session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches)
191229
if err != nil {
192-
return nil, err
230+
return err
193231
}
194232
defer session.Close()
195233

196234
f.sys.backend.ServiceFilter(ctx, session)
197235

198-
// Iterate over the matches until exhausted or context closed
199-
var logs []*types.Log
200-
201236
for {
202237
select {
203238
case number, ok := <-matches:
@@ -207,47 +242,50 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err
207242
if err == nil {
208243
f.begin = int64(end) + 1
209244
}
210-
return logs, err
245+
return err
211246
}
212247
f.begin = int64(number) + 1
213248

214249
// Retrieve the suggested block and pull any truly matching logs
215250
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
216251
if header == nil || err != nil {
217-
return logs, err
252+
return err
218253
}
219254
found, err := f.checkMatches(ctx, header)
220255
if err != nil {
221-
return logs, err
256+
return err
257+
}
258+
for _, log := range found {
259+
logChan <- log
222260
}
223-
logs = append(logs, found...)
224261

225262
case <-ctx.Done():
226-
return logs, ctx.Err()
263+
return ctx.Err()
227264
}
228265
}
229266
}
230267

231268
// unindexedLogs returns the logs matching the filter criteria based on raw block
232269
// iteration and bloom matching.
233-
func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
234-
var logs []*types.Log
235-
270+
func (f *Filter) unindexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error {
236271
for ; f.begin <= int64(end); f.begin++ {
237-
if f.begin%10 == 0 && ctx.Err() != nil {
238-
return logs, ctx.Err()
239-
}
240272
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
241273
if header == nil || err != nil {
242-
return logs, err
274+
return err
243275
}
244276
found, err := f.blockLogs(ctx, header)
245277
if err != nil {
246-
return logs, err
278+
return err
279+
}
280+
for _, log := range found {
281+
select {
282+
case logChan <- log:
283+
case <-ctx.Done():
284+
return ctx.Err()
285+
}
247286
}
248-
logs = append(logs, found...)
249287
}
250-
return logs, nil
288+
return nil
251289
}
252290

253291
// blockLogs returns the logs matching the filter criteria within a single block.
@@ -294,19 +332,19 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*typ
294332
}
295333

296334
// pendingLogs returns the logs matching the filter criteria within the pending block.
297-
func (f *Filter) pendingLogs() ([]*types.Log, error) {
335+
func (f *Filter) pendingLogs() []*types.Log {
298336
block, receipts := f.sys.backend.PendingBlockAndReceipts()
299337
if block == nil {
300-
return nil, errors.New("pending state not available")
338+
return nil
301339
}
302340
if bloomFilter(block.Bloom(), f.addresses, f.topics) {
303341
var unfiltered []*types.Log
304342
for _, r := range receipts {
305343
unfiltered = append(unfiltered, r.Logs...)
306344
}
307-
return filterLogs(unfiltered, nil, nil, f.addresses, f.topics), nil
345+
return filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
308346
}
309-
return nil, nil
347+
return nil
310348
}
311349

312350
func includes(addresses []common.Address, a common.Address) bool {

eth/filters/filter_system_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ type testBackend struct {
5050
rmLogsFeed event.Feed
5151
pendingLogsFeed event.Feed
5252
chainFeed event.Feed
53+
pendingBlock *types.Block
54+
pendingReceipts types.Receipts
5355
}
5456

5557
func (b *testBackend) ChainConfig() *params.ChainConfig {
@@ -124,7 +126,7 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash, number uint
124126
}
125127

126128
func (b *testBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) {
127-
return nil, nil
129+
return b.pendingBlock, b.pendingReceipts
128130
}
129131

130132
func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {

0 commit comments

Comments
 (0)