Skip to content

Commit 018fa25

Browse files
committed
moved nesting dht into a separate package. removed ability to be a server of the inner dht and a client of the outer dht from the nesting dht.
1 parent b81a3e9 commit 018fa25

File tree

3 files changed

+125
-192
lines changed

3 files changed

+125
-192
lines changed

dht_net.go

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import (
44
"bufio"
55
"context"
66
"fmt"
7-
"github.com/libp2p/go-libp2p-core/host"
8-
"github.com/libp2p/go-libp2p-core/protocol"
97
"io"
108
"sync"
119
"time"
@@ -270,14 +268,6 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
270268
return nil
271269
}
272270

273-
func (dht *IpfsDHT) getHost() host.Host {
274-
return dht.host
275-
}
276-
277-
func (dht *IpfsDHT) getProtocols() []protocol.ID {
278-
return append([]protocol.ID{},dht.protocols...)
279-
}
280-
281271
func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messageSender, error) {
282272
dht.smlk.Lock()
283273
ms, ok := dht.strmap[p]
@@ -310,17 +300,12 @@ func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messa
310300
return ms, nil
311301
}
312302

313-
type protocolSender interface {
314-
getHost() host.Host
315-
getProtocols() []protocol.ID
316-
}
317-
318303
type messageSender struct {
319304
s network.Stream
320305
r msgio.ReadCloser
321306
lk ctxMutex
322307
p peer.ID
323-
dht protocolSender
308+
dht *IpfsDHT
324309

325310
invalid bool
326311
singleMes int
@@ -361,7 +346,7 @@ func (ms *messageSender) prep(ctx context.Context) error {
361346
// We only want to speak to peers using our primary protocols. We do not want to query any peer that only speaks
362347
// one of the secondary "server" protocols that we happen to support (e.g. older nodes that we can respond to for
363348
// backwards compatibility reasons).
364-
nstr, err := ms.dht.getHost().NewStream(ctx, ms.p, ms.dht.getProtocols()...)
349+
nstr, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.protocols...)
365350
if err != nil {
366351
return err
367352
}

nesting.go

Lines changed: 0 additions & 175 deletions
This file was deleted.

nesting/nesting.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package nesting
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/hashicorp/go-multierror"
7+
"github.com/ipfs/go-cid"
8+
ci "github.com/libp2p/go-libp2p-core/crypto"
9+
"github.com/libp2p/go-libp2p-core/routing"
10+
"github.com/pkg/errors"
11+
12+
"github.com/libp2p/go-libp2p-core/host"
13+
"github.com/libp2p/go-libp2p-core/peer"
14+
15+
"github.com/libp2p/go-libp2p-kad-dht"
16+
)
17+
18+
// DHT implements the routing interface to
19+
type DHT struct {
20+
Inner, Outer *dht.IpfsDHT
21+
}
22+
23+
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
24+
// guarantee, but we can use them to aid refactoring.
25+
var (
26+
_ routing.ContentRouting = (*DHT)(nil)
27+
_ routing.Routing = (*DHT)(nil)
28+
_ routing.PeerRouting = (*DHT)(nil)
29+
_ routing.PubKeyFetcher = (*DHT)(nil)
30+
_ routing.ValueStore = (*DHT)(nil)
31+
)
32+
33+
func New(ctx context.Context, h host.Host, innerOptions []dht.Option, outerOptions []dht.Option) (*DHT, error) {
34+
inner, err := dht.New(ctx, h, innerOptions...)
35+
if err != nil {
36+
return nil, err
37+
}
38+
39+
outer, err := dht.New(ctx, h, outerOptions...)
40+
if err != nil {
41+
return nil, err
42+
}
43+
44+
d := &DHT{
45+
Inner: inner,
46+
Outer: outer,
47+
}
48+
49+
return d, nil
50+
}
51+
52+
func (dht *DHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) {
53+
var innerResult []peer.ID
54+
peerCh, err := dht.Inner.GetClosestPeersSeeded(ctx, key, nil)
55+
if err == nil {
56+
innerResult = getPeersFromCh(peerCh)
57+
}
58+
59+
outerResultCh, err := dht.Outer.GetClosestPeersSeeded(ctx, key, innerResult)
60+
if err != nil {
61+
return nil, err
62+
}
63+
64+
return getPeersFromCh(outerResultCh), nil
65+
}
66+
67+
func getPeersFromCh(peerCh <-chan peer.ID) []peer.ID {
68+
var peers []peer.ID
69+
for p := range peerCh {
70+
peers = append(peers, p)
71+
}
72+
return peers
73+
}
74+
75+
func (dht *DHT) GetPublicKey(ctx context.Context, id peer.ID) (ci.PubKey, error) {
76+
panic("implement me")
77+
}
78+
79+
func (dht *DHT) Provide(ctx context.Context, cid cid.Cid, b bool) error {
80+
panic("implement me")
81+
}
82+
83+
func (dht *DHT) FindProvidersAsync(ctx context.Context, cid cid.Cid, i int) <-chan peer.AddrInfo {
84+
panic("implement me")
85+
}
86+
87+
func (dht *DHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) {
88+
panic("implement me")
89+
}
90+
91+
func (dht *DHT) PutValue(ctx context.Context, s string, bytes []byte, option ...routing.Option) error {
92+
panic("implement me")
93+
}
94+
95+
func (dht *DHT) GetValue(ctx context.Context, s string, option ...routing.Option) ([]byte, error) {
96+
panic("implement me")
97+
}
98+
99+
func (dht *DHT) SearchValue(ctx context.Context, s string, option ...routing.Option) (<-chan []byte, error) {
100+
panic("implement me")
101+
}
102+
103+
func (dht *DHT) Bootstrap(ctx context.Context) error {
104+
errI := dht.Inner.Bootstrap(ctx)
105+
errO := dht.Outer.Bootstrap(ctx)
106+
107+
errs := make([]error, 0, 2)
108+
if errI != nil {
109+
errs = append(errs, errors.Wrap(errI, fmt.Sprintf("failed to bootstrap inner dht")))
110+
}
111+
if errO != nil {
112+
errs = append(errs, errors.Wrap(errI, fmt.Sprintf("failed to bootstrap outer dht")))
113+
}
114+
115+
switch len(errs) {
116+
case 0:
117+
return nil
118+
case 1:
119+
return errs[0]
120+
default:
121+
return multierror.Append(errs[0], errs[1:]...)
122+
}
123+
}

0 commit comments

Comments
 (0)