Skip to content

Commit 36578e2

Browse files
aarshkshah1992Stebalien
authored andcommitted
Striped locks for atomic Dht updates (#374)
Implement striped locking for datastore puts.
1 parent 3c72a7c commit 36578e2

File tree

3 files changed

+95
-8
lines changed

3 files changed

+95
-8
lines changed

dht.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ type IpfsDHT struct {
6565

6666
plk sync.Mutex
6767

68+
stripedPutLocks [256]sync.Mutex
69+
6870
protocols []protocol.ID // DHT protocols
6971
}
7072

dht_test.go

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ import (
1515
"github.com/libp2p/go-libp2p-core/peer"
1616
"github.com/libp2p/go-libp2p-core/peerstore"
1717
"github.com/libp2p/go-libp2p-core/routing"
18-
19-
multistream "github.com/multiformats/go-multistream"
18+
"github.com/multiformats/go-multistream"
2019

2120
"golang.org/x/xerrors"
2221

@@ -26,12 +25,12 @@ import (
2625
opts "github.com/libp2p/go-libp2p-kad-dht/opts"
2726
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
2827

29-
cid "github.com/ipfs/go-cid"
28+
"github.com/ipfs/go-cid"
3029
u "github.com/ipfs/go-ipfs-util"
3130
kb "github.com/libp2p/go-libp2p-kbucket"
32-
record "github.com/libp2p/go-libp2p-record"
31+
"github.com/libp2p/go-libp2p-record"
3332
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
34-
ci "github.com/libp2p/go-libp2p-testing/ci"
33+
"github.com/libp2p/go-libp2p-testing/ci"
3534
travisci "github.com/libp2p/go-libp2p-testing/ci/travis"
3635
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
3736
ma "github.com/multiformats/go-multiaddr"
@@ -78,6 +77,36 @@ func (testValidator) Validate(_ string, b []byte) error {
7877
return nil
7978
}
8079

80+
type testAtomicPutValidator struct {
81+
testValidator
82+
}
83+
84+
// selects the entry with the 'highest' last byte
85+
func (testAtomicPutValidator) Select(_ string, bs [][]byte) (int, error) {
86+
index := -1
87+
max := uint8(0)
88+
for i, b := range bs {
89+
if bytes.Equal(b, []byte("valid")) {
90+
if index == -1 {
91+
index = i
92+
}
93+
continue
94+
}
95+
96+
str := string(b)
97+
n := str[len(str)-1]
98+
if n > max {
99+
max = n
100+
index = i
101+
}
102+
103+
}
104+
if index == -1 {
105+
return -1, errors.New("no rec found")
106+
}
107+
return index, nil
108+
}
109+
81110
func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
82111
d, err := New(
83112
ctx,
@@ -1107,6 +1136,51 @@ func TestBadProtoMessages(t *testing.T) {
11071136
}
11081137
}
11091138

1139+
func TestAtomicPut(t *testing.T) {
1140+
ctx, cancel := context.WithCancel(context.Background())
1141+
defer cancel()
1142+
1143+
d := setupDHT(ctx, t, false)
1144+
d.Validator = testAtomicPutValidator{}
1145+
1146+
// fnc to put a record
1147+
key := "testkey"
1148+
putRecord := func(value []byte) error {
1149+
rec := record.MakePutRecord(key, value)
1150+
pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
1151+
pmes.Record = rec
1152+
_, err := d.handlePutValue(ctx, "testpeer", pmes)
1153+
return err
1154+
}
1155+
1156+
// put a valid record
1157+
if err := putRecord([]byte("valid")); err != nil {
1158+
t.Fatal("should not have errored on a valid record")
1159+
}
1160+
1161+
// simultaneous puts for old & new values
1162+
values := [][]byte{[]byte("newer1"), []byte("newer7"), []byte("newer3"), []byte("newer5")}
1163+
var wg sync.WaitGroup
1164+
for _, v := range values {
1165+
wg.Add(1)
1166+
go func(v []byte) {
1167+
defer wg.Done()
1168+
putRecord(v)
1169+
}(v)
1170+
}
1171+
wg.Wait()
1172+
1173+
// get should return the newest value
1174+
pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
1175+
msg, err := d.handleGetValue(ctx, "testkey", pmes)
1176+
if err != nil {
1177+
t.Fatalf("should not have errored on final get, but got %+v", err)
1178+
}
1179+
if string(msg.GetRecord().Value) != "newer7" {
1180+
t.Fatalf("Expected 'newer7' got '%s'", string(msg.GetRecord().Value))
1181+
}
1182+
}
1183+
11101184
func TestClientModeConnect(t *testing.T) {
11111185
ctx, cancel := context.WithCancel(context.Background())
11121186
defer cancel()

handlers.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@ import (
1212
"github.com/libp2p/go-libp2p-core/peerstore"
1313
pstore "github.com/libp2p/go-libp2p-peerstore"
1414

15-
proto "github.com/gogo/protobuf/proto"
16-
cid "github.com/ipfs/go-cid"
15+
"github.com/gogo/protobuf/proto"
16+
"github.com/ipfs/go-cid"
1717
ds "github.com/ipfs/go-datastore"
1818
u "github.com/ipfs/go-ipfs-util"
1919
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
2020
recpb "github.com/libp2p/go-libp2p-record/pb"
21-
base32 "github.com/whyrusleeping/base32"
21+
"github.com/whyrusleeping/base32"
2222
)
2323

2424
// The number of closer peers to send on requests.
@@ -173,6 +173,17 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
173173

174174
dskey := convertToDsKey(rec.GetKey())
175175

176+
// fetch the striped lock for this key
177+
var indexForLock byte
178+
if len(rec.GetKey()) == 0 {
179+
indexForLock = 0
180+
} else {
181+
indexForLock = rec.GetKey()[len(rec.GetKey())-1]
182+
}
183+
lk := &dht.stripedPutLocks[indexForLock]
184+
lk.Lock()
185+
defer lk.Unlock()
186+
176187
// Make sure the new record is "better" than the record we have locally.
177188
// This prevents a record with for example a lower sequence number from
178189
// overwriting a record with a higher sequence number.

0 commit comments

Comments
 (0)