Skip to content

Commit 84d8f8a

Browse files
committed
Introduce database versioning and migrations
The fix for #238 does not mean that everything is fine. We will have databases which have the wrong value/priority sets written and this would only fix itself on new writes or deletes to the same key. So we are unfortunately forced to manually fix it on start. For this we introduce a data migration. During a fresh start, we will then find all the keys affected by tombstones and loop them, finding the best value for them (the correct one) and fixing them. Once done we record that we are on version=1 and don't run this again. Future fuckups can be fixed with other migrations.
1 parent 6d2f62f commit 84d8f8a

File tree

3 files changed

+339
-65
lines changed

3 files changed

+339
-65
lines changed

crdt.go

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ const (
4848
setNs = "s" // set
4949
processedBlocksNs = "b" // blocks
5050
dirtyBitKey = "d" // dirty
51+
versionKey = "crdt_version"
5152
)
5253

5354
// Common errors.
@@ -86,9 +87,9 @@ type Options struct {
8687
// element is successfully removed from the datastore (either by a
8788
// local or remote update). Unordered and concurrent updates may
8889
// result in the DeleteHook being triggered even though the element is
89-
// still present in the datastore because it was re-added. If that is
90-
// relevant, use Has() to check if the removed element is still part
91-
// of the datastore.
90+
// still present in the datastore because it was re-added or not fully
91+
// tombstoned. If that is relevant, use Has() to check if the removed
92+
// element is still part of the datastore.
9293
DeleteHook func(k ds.Key)
9394
// NumWorkers specifies the number of workers ready to walk DAGs
9495
NumWorkers int
@@ -257,7 +258,7 @@ func New(
257258
}
258259

259260
ctx, cancel := context.WithCancel(context.Background())
260-
set, err := newCRDTSet(ctx, store, fullSetNs, opts.Logger, setPutHook, setDeleteHook)
261+
set, err := newCRDTSet(ctx, store, fullSetNs, dagSyncer, opts.Logger, setPutHook, setDeleteHook)
261262
if err != nil {
262263
cancel()
263264
return nil, errors.Wrap(err, "error setting up crdt set")
@@ -285,8 +286,15 @@ func New(
285286
queuedChildren: newCidSafeSet(),
286287
}
287288

289+
err = dstore.applyMigrations(ctx)
290+
if err != nil {
291+
cancel()
292+
return nil, err
293+
}
294+
288295
headList, maxHeight, err := dstore.heads.List()
289296
if err != nil {
297+
cancel()
290298
return nil, err
291299
}
292300
dstore.logger.Infof(
@@ -576,7 +584,7 @@ func (store *Datastore) handleBlock(c cid.Cid) error {
576584
// Ignore already processed blocks.
577585
// This includes the case when the block is a current
578586
// head.
579-
isProcessed, err := store.isProcessed(c)
587+
isProcessed, err := store.isProcessed(store.ctx, c)
580588
if err != nil {
581589
return errors.Wrapf(err, "error checking for known block %s", c)
582590
}
@@ -733,11 +741,11 @@ func (store *Datastore) processedBlockKey(c cid.Cid) ds.Key {
733741
return store.namespace.ChildString(processedBlocksNs).ChildString(dshelp.MultihashToDsKey(c.Hash()).String())
734742
}
735743

736-
func (store *Datastore) isProcessed(c cid.Cid) (bool, error) {
744+
func (store *Datastore) isProcessed(ctx context.Context, c cid.Cid) (bool, error) {
737745
return store.store.Has(store.ctx, store.processedBlockKey(c))
738746
}
739747

740-
func (store *Datastore) markProcessed(c cid.Cid) error {
748+
func (store *Datastore) markProcessed(ctx context.Context, c cid.Cid) error {
741749
return store.store.Put(store.ctx, store.processedBlockKey(c), nil)
742750
}
743751

@@ -785,7 +793,7 @@ func (store *Datastore) processNode(ng *crdtNodeGetter, root cid.Cid, rootPrio u
785793

786794
// Record that we have processed the node so that any other worker
787795
// can skip it.
788-
err = store.markProcessed(current)
796+
err = store.markProcessed(store.ctx, current)
789797
if err != nil {
790798
return nil, errors.Wrapf(err, "error recording %s as processed", current)
791799
}
@@ -827,7 +835,7 @@ func (store *Datastore) processNode(ng *crdtNodeGetter, root cid.Cid, rootPrio u
827835
return nil, errors.Wrapf(err, "error checking if %s is head", child)
828836
}
829837

830-
isProcessed, err := store.isProcessed(child)
838+
isProcessed, err := store.isProcessed(store.ctx, child)
831839
if err != nil {
832840
return nil, errors.Wrapf(err, "error checking for known block %s", child)
833841
}
@@ -959,7 +967,7 @@ func (store *Datastore) repairDAG() error {
959967
}
960968
cancel()
961969

962-
isProcessed, err := store.isProcessed(cur)
970+
isProcessed, err := store.isProcessed(store.ctx, cur)
963971
if err != nil {
964972
return errors.Wrapf(err, "error checking for reprocessed block %s", cur)
965973
}
@@ -1364,7 +1372,9 @@ func (store *Datastore) printDAGRec(from cid.Cid, depth uint64, ng *crdtNodeGett
13641372
return nil
13651373
}
13661374

1367-
nd, delta, err := ng.GetDelta(context.Background(), from)
1375+
ctx, cancel := context.WithTimeout(store.ctx, store.opts.DAGSyncerTimeout)
1376+
defer cancel()
1377+
nd, delta, err := ng.GetDelta(ctx, from)
13681378
if err != nil {
13691379
return err
13701380
}
@@ -1385,7 +1395,19 @@ func (store *Datastore) printDAGRec(from cid.Cid, depth uint64, ng *crdtNodeGett
13851395
cidStr = cidStr[len(cidStr)-4:]
13861396
line += fmt.Sprintf("%s,", cidStr)
13871397
}
1388-
line += "}:"
1398+
line += "}"
1399+
1400+
processed, err := store.isProcessed(store.ctx, nd.Cid())
1401+
if err != nil {
1402+
return err
1403+
}
1404+
1405+
if !processed {
1406+
line += " Unprocessed!"
1407+
}
1408+
1409+
line += ":"
1410+
13891411
fmt.Println(line)
13901412
for _, l := range nd.Links() {
13911413
store.printDAGRec(l.Cid, depth+1, ng, set)
@@ -1433,7 +1455,9 @@ func (store *Datastore) dotDAGRec(w io.Writer, from cid.Cid, depth uint64, ng *c
14331455
return nil
14341456
}
14351457

1436-
nd, delta, err := ng.GetDelta(context.Background(), from)
1458+
ctx, cancel := context.WithTimeout(store.ctx, store.opts.DAGSyncerTimeout)
1459+
defer cancel()
1460+
nd, delta, err := ng.GetDelta(ctx, from)
14371461
if err != nil {
14381462
return err
14391463
}

crdt_test.go

Lines changed: 144 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package crdt
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"math/rand"
78
"os"
@@ -26,7 +27,7 @@ import (
2627
)
2728

2829
var numReplicas = 15
29-
var debug = false
30+
var debug = true
3031

3132
const (
3233
mapStore = iota
@@ -235,7 +236,7 @@ func makeNReplicas(t testing.TB, n int, opts *Options) ([]*Datastore, func()) {
235236
name: fmt.Sprintf("r#%d: ", i),
236237
l: DefaultOptions().Logger,
237238
}
238-
replicaOpts[i].RebroadcastInterval = time.Second * 10
239+
replicaOpts[i].RebroadcastInterval = time.Second * 1
239240
replicaOpts[i].NumWorkers = 5
240241
replicaOpts[i].DAGSyncerTimeout = time.Second
241242
}
@@ -817,56 +818,6 @@ func TestCRDTBroadcastBackwardsCompat(t *testing.T) {
817818
}
818819
}
819820

820-
// There is no easy way to see if the bloom filter is doing its job without
821-
// wiring some sort of metric or benchmarking. Instead, this just hits the
822-
// 3 places relevant to bloom filter:
823-
// * When adding a tomb
824-
// * When checking if something is tombstoned
825-
// * When priming the filter
826-
//
827-
// The main thing is to manually verify (via printlns) that the bloom filter
828-
// is used with the expected key everywhere: i.e. "/mykey" and not
829-
// "mykey". "/something/else" and not "/something". Protip: it has been
830-
// verified and it does that.
831-
func TestBloomingTombstones(t *testing.T) {
832-
ctx := context.Background()
833-
replicas, closeReplicas := makeNReplicas(t, 1, nil)
834-
defer closeReplicas()
835-
836-
k := ds.NewKey("hola/adios/")
837-
err := replicas[0].Put(ctx, k, []byte("bytes"))
838-
if err != nil {
839-
t.Fatal(err)
840-
}
841-
842-
err = replicas[0].Delete(ctx, k)
843-
if err != nil {
844-
t.Fatal(err)
845-
}
846-
847-
err = replicas[0].Put(ctx, k, []byte("bytes"))
848-
if err != nil {
849-
t.Fatal(err)
850-
}
851-
852-
q := query.Query{
853-
KeysOnly: false,
854-
}
855-
results, err := replicas[0].Query(ctx, q)
856-
if err != nil {
857-
t.Fatal(err)
858-
}
859-
defer results.Close()
860-
861-
for r := range results.Next() {
862-
if r.Error != nil {
863-
t.Error(r.Error)
864-
}
865-
}
866-
867-
replicas[0].set.primeBloomFilter(ctx)
868-
}
869-
870821
func BenchmarkQueryElements(b *testing.B) {
871822
ctx := context.Background()
872823
replicas, closeReplicas := makeNReplicas(b, 1, nil)
@@ -914,3 +865,144 @@ func TestRandomizeInterval(t *testing.T) {
914865
prevR = r
915866
}
916867
}
868+
869+
func TestCRDTPutPutDelete(t *testing.T) {
870+
replicas, closeReplicas := makeNReplicas(t, 2, nil)
871+
defer closeReplicas()
872+
873+
ctx := context.Background()
874+
875+
br0 := replicas[0].broadcaster.(*mockBroadcaster)
876+
br0.dropProb = 101
877+
878+
br1 := replicas[1].broadcaster.(*mockBroadcaster)
879+
br1.dropProb = 101
880+
881+
k := ds.NewKey("k1")
882+
883+
// r0 - put put delete
884+
err := replicas[0].Put(ctx, k, []byte("r0-1"))
885+
if err != nil {
886+
t.Fatal(err)
887+
}
888+
err = replicas[0].Put(ctx, k, []byte("r0-2"))
889+
if err != nil {
890+
t.Fatal(err)
891+
}
892+
err = replicas[0].Delete(ctx, k)
893+
if err != nil {
894+
t.Fatal(err)
895+
}
896+
897+
// r1 - put
898+
err = replicas[1].Put(ctx, k, []byte("r1-1"))
899+
if err != nil {
900+
t.Fatal(err)
901+
}
902+
903+
br0.dropProb = 0
904+
br1.dropProb = 0
905+
906+
time.Sleep(5 * time.Second)
907+
908+
r0Res, err := replicas[0].Get(ctx, ds.NewKey("k1"))
909+
if err != nil {
910+
if !errors.Is(err, ds.ErrNotFound) {
911+
t.Fatal(err)
912+
}
913+
}
914+
915+
r1Res, err := replicas[1].Get(ctx, ds.NewKey("k1"))
916+
if err != nil {
917+
t.Fatal(err)
918+
}
919+
closeReplicas()
920+
921+
if string(r0Res) != string(r1Res) {
922+
fmt.Printf("r0Res: %s\nr1Res: %s\n", string(r0Res), string(r1Res))
923+
t.Log("r0 dag")
924+
replicas[0].PrintDAG()
925+
926+
t.Log("r1 dag")
927+
replicas[1].PrintDAG()
928+
929+
t.Fatal("r0 and r1 should have the same value")
930+
}
931+
}
932+
933+
func TestMigration0to1(t *testing.T) {
934+
replicas, closeReplicas := makeNReplicas(t, 1, nil)
935+
defer closeReplicas()
936+
replica := replicas[0]
937+
ctx := context.Background()
938+
939+
nItems := 200
940+
var keys []ds.Key
941+
// Add nItems
942+
for i := 0; i < nItems; i++ {
943+
k := ds.RandomKey()
944+
keys = append(keys, k)
945+
v := []byte(fmt.Sprintf("%d", i))
946+
err := replica.Put(ctx, k, v)
947+
if err != nil {
948+
t.Fatal(err)
949+
}
950+
951+
}
952+
953+
// Overwrite n/2 items 5 times to have multiple tombstones per key
954+
// later...
955+
for j := 0; j < 5; j++ {
956+
for i := 0; i < nItems/2; i++ {
957+
v := []byte(fmt.Sprintf("%d", i))
958+
err := replica.Put(ctx, keys[i], v)
959+
if err != nil {
960+
t.Fatal(err)
961+
}
962+
}
963+
}
964+
965+
// delete keys
966+
for i := 0; i < nItems/2; i++ {
967+
err := replica.Delete(ctx, keys[i])
968+
if err != nil {
969+
t.Fatal(err)
970+
}
971+
}
972+
973+
// And write them again
974+
for i := 0; i < nItems/2; i++ {
975+
err := replica.Put(ctx, keys[i], []byte("final value"))
976+
if err != nil {
977+
t.Fatal(err)
978+
}
979+
}
980+
981+
// And now we manually put the wrong value
982+
for i := 0; i < nItems/2; i++ {
983+
valueK := replica.set.valueKey(keys[i].String())
984+
err := replica.set.store.Put(ctx, valueK, []byte("wrong value"))
985+
if err != nil {
986+
t.Fatal(err)
987+
}
988+
err = replica.set.setPriority(ctx, replica.set.store, keys[i].String(), 1)
989+
if err != nil {
990+
t.Fatal(err)
991+
}
992+
}
993+
994+
err := replica.migrate0to1(ctx)
995+
if err != nil {
996+
t.Fatal(err)
997+
}
998+
999+
for i := 0; i < nItems/2; i++ {
1000+
v, err := replica.Get(ctx, keys[i])
1001+
if err != nil {
1002+
t.Fatal(err)
1003+
}
1004+
if string(v) != "final value" {
1005+
t.Fatalf("value for elem %d should be final value: %s", i, string(v))
1006+
}
1007+
}
1008+
}

0 commit comments

Comments
 (0)