Skip to content

Commit 2a950ae

Browse files
1. Expire non-provider records older than MaxAge
2. Original publisher shoulld republish putvalue records
1 parent a12e621 commit 2a950ae

File tree

8 files changed

+208
-13
lines changed

8 files changed

+208
-13
lines changed

dht.go

+5
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,11 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
100100
dht.proc.AddChild(dht.providers.Process())
101101
dht.Validator = cfg.Validator
102102

103+
// proc to expire putValue records
104+
recordExpiryProc := goprocessctx.WithContext(ctx)
105+
recordExpiryProc.Go(dht.expireNonProviderRecords)
106+
dht.proc.AddChild(recordExpiryProc)
107+
103108
if !cfg.Client {
104109
for _, p := range cfg.Protocols {
105110
h.SetStreamHandler(p, dht.handleNewStream)

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ require (
2121
github.com/mr-tron/base58 v1.1.2
2222
github.com/multiformats/go-multiaddr v0.0.4
2323
github.com/multiformats/go-multiaddr-dns v0.0.2
24+
github.com/multiformats/go-multihash v0.0.5
2425
github.com/multiformats/go-multistream v0.1.0
2526
github.com/stretchr/testify v1.3.0
2627
github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc

handlers.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (dht *IpfsDHT) checkLocalDatastore(k []byte) (*recpb.Record, error) {
121121
recordIsBad = true
122122
}
123123

124-
if time.Since(recvtime) > MaxRecordAge {
124+
if time.Since(recvtime) > maxNonProviderRecordAge {
125125
logger.Debug("old record found, tossing.")
126126
recordIsBad = true
127127
}

providers/providers.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,11 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching)
7474
return pm
7575
}
7676

77-
const providersKeyPrefix = "/providers/"
77+
// prefix to be used for all provider record keys
78+
const ProvidersKeyPrefix = "/providers/"
7879

7980
func mkProvKey(k cid.Cid) string {
80-
return providersKeyPrefix + base32.RawStdEncoding.EncodeToString(k.Bytes())
81+
return ProvidersKeyPrefix + base32.RawStdEncoding.EncodeToString(k.Bytes())
8182
}
8283

8384
func (pm *ProviderManager) Process() goprocess.Process {
@@ -284,7 +285,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
284285

285286
// Now, kick off a GC of the datastore.
286287
q, err := pm.dstore.Query(dsq.Query{
287-
Prefix: providersKeyPrefix,
288+
Prefix: ProvidersKeyPrefix,
288289
})
289290
if err != nil {
290291
log.Error("provider record GC query failed: ", err)

providers/providers_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ func TestProvidesExpire(t *testing.T) {
185185
t.Fatal("providers map not cleaned up")
186186
}
187187

188-
res, err := ds.Query(dsq.Query{Prefix: providersKeyPrefix})
188+
res, err := ds.Query(dsq.Query{Prefix: ProvidersKeyPrefix})
189189
if err != nil {
190190
t.Fatal(err)
191191
}

records.go

+69-5
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,30 @@ package dht
33
import (
44
"context"
55
"fmt"
6+
"strings"
67
"time"
78

9+
"github.com/gogo/protobuf/proto"
10+
ds "github.com/ipfs/go-datastore"
11+
"github.com/ipfs/go-datastore/query"
12+
u "github.com/ipfs/go-ipfs-util"
13+
"github.com/jbenet/goprocess"
14+
ci "github.com/libp2p/go-libp2p-core/crypto"
815
"github.com/libp2p/go-libp2p-core/peer"
916
"github.com/libp2p/go-libp2p-core/routing"
10-
11-
ci "github.com/libp2p/go-libp2p-core/crypto"
17+
"github.com/libp2p/go-libp2p-kad-dht/providers"
18+
recpb "github.com/libp2p/go-libp2p-record/pb"
1219
)
1320

14-
// MaxRecordAge specifies the maximum time that any node will hold onto a record
21+
// maxNonProviderRecordAge specifies the maximum time that any node will hold onto a record
1522
// from the time its received. This does not apply to any other forms of validity that
1623
// the record may contain.
1724
// For example, a record may contain an ipns entry with an EOL saying its valid
1825
// until the year 2020 (a great time in the future). For that record to stick around
19-
// it must be rebroadcasted more frequently than once every 'MaxRecordAge'
20-
const MaxRecordAge = time.Hour * 36
26+
// it must be rebroadcasted more frequently than once every 'maxNonProviderRecordAge'
27+
var maxNonProviderRecordAge = time.Hour * 12
28+
29+
var defaultRecordsSweepInterval = time.Hour * 1
2130

2231
type pubkrs struct {
2332
pubk ci.PubKey
@@ -135,3 +144,58 @@ func (dht *IpfsDHT) getPublicKeyFromNode(ctx context.Context, p peer.ID) (ci.Pub
135144
logger.Debugf("Got public key from node %v itself", p)
136145
return pubk, nil
137146
}
147+
148+
func (dht *IpfsDHT) expireNonProviderRecords(proc goprocess.Process) {
149+
for {
150+
select {
151+
case <-proc.Closing():
152+
return
153+
case <-time.After(defaultRecordsSweepInterval):
154+
}
155+
156+
res, err := dht.datastore.Query(query.Query{Filters: []query.Filter{&expireRecordFilter{}}})
157+
if err != nil {
158+
logger.Errorf("expire records proc: failed to run query against datastore, error is %+v", err)
159+
continue
160+
}
161+
162+
for {
163+
e, ok := res.NextSync()
164+
if !ok {
165+
break
166+
}
167+
if err := dht.datastore.Delete(ds.RawKey(e.Key)); err != nil {
168+
logger.Errorf("expire records proc: failed to delete key %s from datastore, error is %+v", e.Key, err)
169+
}
170+
}
171+
}
172+
}
173+
174+
type expireRecordFilter struct{}
175+
176+
func (f *expireRecordFilter) Filter(e query.Entry) bool {
177+
// unmarshal record
178+
rec := new(recpb.Record)
179+
if err := proto.Unmarshal(e.Value, rec); err != nil {
180+
logger.Debugf("expire records filter: failed to unmarshal DHT record from datastore, error is %+v", err)
181+
return false
182+
}
183+
184+
// should not be a provider record
185+
if strings.HasPrefix(e.Key, providers.ProvidersKeyPrefix) {
186+
return false
187+
}
188+
189+
// age should be greater than maxNonProviderRecordAge
190+
t, err := u.ParseRFC3339(rec.TimeReceived)
191+
if err != nil {
192+
logger.Debugf("expire records filter: failed to parse time in DHT record, error is %+v", err)
193+
return false
194+
}
195+
196+
if time.Since(t) > maxNonProviderRecordAge {
197+
return true
198+
}
199+
200+
return false
201+
}

records_test.go

+103-3
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,22 @@ package dht
33
import (
44
"context"
55
"crypto/rand"
6-
"github.com/libp2p/go-libp2p-core/test"
76
"testing"
87
"time"
98

9+
"github.com/ipfs/go-cid"
10+
"github.com/libp2p/go-libp2p-core/test"
11+
"github.com/multiformats/go-multihash"
12+
"github.com/stretchr/testify/assert"
13+
14+
ds "github.com/ipfs/go-datastore"
1015
u "github.com/ipfs/go-ipfs-util"
1116
ci "github.com/libp2p/go-libp2p-core/crypto"
1217
"github.com/libp2p/go-libp2p-core/peer"
1318
"github.com/libp2p/go-libp2p-core/routing"
14-
record "github.com/libp2p/go-libp2p-record"
15-
tnet "github.com/libp2p/go-libp2p-testing/net"
19+
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
20+
"github.com/libp2p/go-libp2p-record"
21+
"github.com/libp2p/go-libp2p-testing/net"
1622
)
1723

1824
// Check that GetPublicKey() correctly extracts a public key
@@ -305,3 +311,97 @@ func TestPubkeyGoodKeyFromDHTGoodKeyDirect(t *testing.T) {
305311
t.Fatal("got incorrect public key")
306312
}
307313
}
314+
315+
func TestExpireNonProviderRecords(t *testing.T) {
316+
sVal := defaultRecordsSweepInterval
317+
defer func() { defaultRecordsSweepInterval = sVal }()
318+
319+
defaultRecordsSweepInterval = 20 * time.Millisecond
320+
321+
// create dht
322+
ctx, cancel := context.WithCancel(context.Background())
323+
defer cancel()
324+
d := setupDHT(ctx, t, false)
325+
326+
putRecord := func(key string, value []byte) error {
327+
rec := record.MakePutRecord(key, value)
328+
pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
329+
pmes.Record = rec
330+
_, err := d.handlePutValue(ctx, "testpeer", pmes)
331+
return err
332+
}
333+
334+
addProv := func(c cid.Cid) error {
335+
msg, err := d.makeProvRecord(c)
336+
pi := peer.AddrInfo{
337+
ID: "testpeer",
338+
Addrs: d.host.Addrs(),
339+
}
340+
msg.ProviderPeers = pb.RawPeerInfosToPBPeers([]peer.AddrInfo{pi})
341+
assert.NoError(t, err)
342+
343+
_, err = d.handleAddProvider(ctx, "testpeer", msg)
344+
return err
345+
}
346+
347+
getProv := func(c cid.Cid) (*pb.Message, error) {
348+
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, c.Bytes(), 0)
349+
m, err := d.handleGetProviders(ctx, "test peer", pmes)
350+
return m, err
351+
}
352+
353+
// put non-provider record 1 with current time
354+
key1 := "/v/key1"
355+
value1 := []byte("v1")
356+
assert.NoError(t, putRecord(key1, value1))
357+
358+
// put non-provider record 2 with current time
359+
key2 := "/v/key2"
360+
value2 := []byte("v2")
361+
assert.NoError(t, putRecord(key2, value2))
362+
363+
// add provider with current time
364+
mh, err := multihash.Sum([]byte("data"), multihash.SHA2_256, -1)
365+
assert.NoError(t, err)
366+
c := cid.NewCidV0(mh)
367+
assert.NoError(t, addProv(c))
368+
369+
// sweep will not delete any of them
370+
time.Sleep(100 * time.Millisecond)
371+
372+
// get & verify all are present
373+
374+
// we need to check the datastore for non-provider records to test the expiry Proc
375+
// because a side-effect of handle get value is also that it deletes records which are beyond MaxAge
376+
// & we do not want to hit that path
377+
_, err = d.datastore.Get(convertToDsKey([]byte(key1)))
378+
assert.NoError(t, err)
379+
380+
_, err = d.datastore.Get(convertToDsKey([]byte(key2)))
381+
assert.NoError(t, err)
382+
383+
// ensure provider record is still available
384+
m, err := getProv(c)
385+
assert.NoError(t, err)
386+
assert.NotEmpty(t, m.ProviderPeers)
387+
388+
// change max age to 100 millisecond
389+
mVal := maxNonProviderRecordAge
390+
maxNonProviderRecordAge = 100 * time.Millisecond
391+
defer func() { maxNonProviderRecordAge = mVal }()
392+
393+
// sweep will remove non-provider both records now
394+
time.Sleep(100 * time.Millisecond)
395+
396+
// verify both non-provider records are absent
397+
_, err = d.datastore.Get(convertToDsKey([]byte(key1)))
398+
assert.Equal(t, ds.ErrNotFound, err)
399+
400+
_, err = d.datastore.Get(convertToDsKey([]byte(key2)))
401+
assert.Equal(t, ds.ErrNotFound, err)
402+
403+
// but, provider record will still be available
404+
m, err = getProv(c)
405+
assert.NoError(t, err)
406+
assert.NotEmpty(t, m.ProviderPeers)
407+
}

routing.go

+24
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
// results will wait for the channel to drain.
2828
var asyncQueryBuffer = 10
2929

30+
var putValueRepublishInterval = 6 * time.Hour
31+
3032
// This file implements the Routing interface for the IpfsDHT struct.
3133

3234
// Basic Put/Get
@@ -98,6 +100,28 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts
98100
}(p)
99101
}
100102
wg.Wait()
103+
104+
// original publisher should keep re-publishing the record because the network isn't `steady`/`stable`
105+
// and the K closet peers we just published to can become unavailable / no longer be the K closet
106+
go func() {
107+
for {
108+
select {
109+
case <-dht.proc.Closing():
110+
return
111+
case <-time.After(putValueRepublishInterval):
112+
// TODO:We can not re-use the original context here as it may have expired
113+
// But, is it fair to use this one ?
114+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
115+
if err := dht.PutValue(ctx, key, value, opts...); err != nil {
116+
logger.Errorf("putValue republish proc: failed to republish key %s, error is %+v", key, err)
117+
} else {
118+
logger.Debugf("putValue republish proc: successfully republished key %s", key)
119+
}
120+
cancel()
121+
}
122+
}
123+
}()
124+
101125
return nil
102126
}
103127

0 commit comments

Comments
 (0)