Skip to content

Commit b43de73

Browse files
authored
Merge pull request #241 from ipfs/fix/238-non-converging
Fix #238: Non converging operations with unordered deletes. Migration to fix existing datastores.
2 parents 0f14947 + 7f73711 commit b43de73

File tree

4 files changed

+501
-247
lines changed

4 files changed

+501
-247
lines changed

crdt.go

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ const (
4848
setNs = "s" // set
4949
processedBlocksNs = "b" // blocks
5050
dirtyBitKey = "d" // dirty
51+
versionKey = "crdt_version"
5152
)
5253

5354
// Common errors.
@@ -86,9 +87,9 @@ type Options struct {
8687
// element is successfully removed from the datastore (either by a
8788
// local or remote update). Unordered and concurrent updates may
8889
// result in the DeleteHook being triggered even though the element is
89-
// still present in the datastore because it was re-added. If that is
90-
// relevant, use Has() to check if the removed element is still part
91-
// of the datastore.
90+
// still present in the datastore because it was re-added or not fully
91+
// tombstoned. If that is relevant, use Has() to check if the removed
92+
// element is still part of the datastore.
9293
DeleteHook func(k ds.Key)
9394
// NumWorkers specifies the number of workers ready to walk DAGs
9495
NumWorkers int
@@ -257,7 +258,7 @@ func New(
257258
}
258259

259260
ctx, cancel := context.WithCancel(context.Background())
260-
set, err := newCRDTSet(ctx, store, fullSetNs, opts.Logger, setPutHook, setDeleteHook)
261+
set, err := newCRDTSet(ctx, store, fullSetNs, dagSyncer, opts.Logger, setPutHook, setDeleteHook)
261262
if err != nil {
262263
cancel()
263264
return nil, errors.Wrap(err, "error setting up crdt set")
@@ -285,8 +286,15 @@ func New(
285286
queuedChildren: newCidSafeSet(),
286287
}
287288

289+
err = dstore.applyMigrations(ctx)
290+
if err != nil {
291+
cancel()
292+
return nil, err
293+
}
294+
288295
headList, maxHeight, err := dstore.heads.List()
289296
if err != nil {
297+
cancel()
290298
return nil, err
291299
}
292300
dstore.logger.Infof(
@@ -576,7 +584,7 @@ func (store *Datastore) handleBlock(c cid.Cid) error {
576584
// Ignore already processed blocks.
577585
// This includes the case when the block is a current
578586
// head.
579-
isProcessed, err := store.isProcessed(c)
587+
isProcessed, err := store.isProcessed(store.ctx, c)
580588
if err != nil {
581589
return errors.Wrapf(err, "error checking for known block %s", c)
582590
}
@@ -733,11 +741,11 @@ func (store *Datastore) processedBlockKey(c cid.Cid) ds.Key {
733741
return store.namespace.ChildString(processedBlocksNs).ChildString(dshelp.MultihashToDsKey(c.Hash()).String())
734742
}
735743

736-
func (store *Datastore) isProcessed(c cid.Cid) (bool, error) {
744+
func (store *Datastore) isProcessed(ctx context.Context, c cid.Cid) (bool, error) {
737745
return store.store.Has(store.ctx, store.processedBlockKey(c))
738746
}
739747

740-
func (store *Datastore) markProcessed(c cid.Cid) error {
748+
func (store *Datastore) markProcessed(ctx context.Context, c cid.Cid) error {
741749
return store.store.Put(store.ctx, store.processedBlockKey(c), nil)
742750
}
743751

@@ -785,7 +793,7 @@ func (store *Datastore) processNode(ng *crdtNodeGetter, root cid.Cid, rootPrio u
785793

786794
// Record that we have processed the node so that any other worker
787795
// can skip it.
788-
err = store.markProcessed(current)
796+
err = store.markProcessed(store.ctx, current)
789797
if err != nil {
790798
return nil, errors.Wrapf(err, "error recording %s as processed", current)
791799
}
@@ -827,7 +835,7 @@ func (store *Datastore) processNode(ng *crdtNodeGetter, root cid.Cid, rootPrio u
827835
return nil, errors.Wrapf(err, "error checking if %s is head", child)
828836
}
829837

830-
isProcessed, err := store.isProcessed(child)
838+
isProcessed, err := store.isProcessed(store.ctx, child)
831839
if err != nil {
832840
return nil, errors.Wrapf(err, "error checking for known block %s", child)
833841
}
@@ -959,7 +967,7 @@ func (store *Datastore) repairDAG() error {
959967
}
960968
cancel()
961969

962-
isProcessed, err := store.isProcessed(cur)
970+
isProcessed, err := store.isProcessed(store.ctx, cur)
963971
if err != nil {
964972
return errors.Wrapf(err, "error checking for reprocessed block %s", cur)
965973
}
@@ -1364,7 +1372,9 @@ func (store *Datastore) printDAGRec(from cid.Cid, depth uint64, ng *crdtNodeGett
13641372
return nil
13651373
}
13661374

1367-
nd, delta, err := ng.GetDelta(context.Background(), from)
1375+
ctx, cancel := context.WithTimeout(store.ctx, store.opts.DAGSyncerTimeout)
1376+
defer cancel()
1377+
nd, delta, err := ng.GetDelta(ctx, from)
13681378
if err != nil {
13691379
return err
13701380
}
@@ -1385,7 +1395,19 @@ func (store *Datastore) printDAGRec(from cid.Cid, depth uint64, ng *crdtNodeGett
13851395
cidStr = cidStr[len(cidStr)-4:]
13861396
line += fmt.Sprintf("%s,", cidStr)
13871397
}
1388-
line += "}:"
1398+
line += "}"
1399+
1400+
processed, err := store.isProcessed(store.ctx, nd.Cid())
1401+
if err != nil {
1402+
return err
1403+
}
1404+
1405+
if !processed {
1406+
line += " Unprocessed!"
1407+
}
1408+
1409+
line += ":"
1410+
13891411
fmt.Println(line)
13901412
for _, l := range nd.Links() {
13911413
store.printDAGRec(l.Cid, depth+1, ng, set)
@@ -1433,7 +1455,9 @@ func (store *Datastore) dotDAGRec(w io.Writer, from cid.Cid, depth uint64, ng *c
14331455
return nil
14341456
}
14351457

1436-
nd, delta, err := ng.GetDelta(context.Background(), from)
1458+
ctx, cancel := context.WithTimeout(store.ctx, store.opts.DAGSyncerTimeout)
1459+
defer cancel()
1460+
nd, delta, err := ng.GetDelta(ctx, from)
14371461
if err != nil {
14381462
return err
14391463
}

0 commit comments

Comments
 (0)