Skip to content

Commit d150b9b

Browse files
authored
Merge pull request #1533 from 0chain/feat/recover-trie
Recover trie
2 parents 06767cc + 2d21861 commit d150b9b

File tree

4 files changed

+92
-0
lines changed

4 files changed

+92
-0
lines changed

code/go/0chain.net/blobber/flags.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ var (
2323
httpsKeyFile string
2424
httpsCertFile string
2525
hostUrl string
26+
recoverTrie bool
2627
)
2728

2829
func init() {
@@ -45,6 +46,7 @@ func init() {
4546
flag.StringVar(&hostUrl, "hosturl", "", "register url on blockchain instead of [schema://hostname+port] if it has value")
4647

4748
flag.IntVar(&grpcPort, "grpc_port", 0, "grpc_port")
49+
flag.BoolVar(&recoverTrie, "recover_trie", false, "recover_trie")
4850
}
4951

5052
func parseFlags() {

code/go/0chain.net/blobber/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation"
45
"github.com/0chain/blobber/code/go/0chain.net/core/common"
56
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
67
"github.com/0chain/blobber/code/go/0chain.net/core/node"
@@ -58,6 +59,11 @@ func main() {
5859
panic(err)
5960
}
6061

62+
if recoverTrie {
63+
logging.Logger.Info("Recovering trie")
64+
allocation.RecoverTrie()
65+
}
66+
6167
// todo: activate this when gRPC functionalities are implemented
6268
// go startGRPCServer()
6369

code/go/0chain.net/blobbercore/allocation/entity.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,13 @@ import (
88
"time"
99

1010
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
11+
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
12+
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
1113
"github.com/0chain/blobber/code/go/0chain.net/core/common"
14+
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
1215
"github.com/0chain/common/core/encryption"
1316
"github.com/0chain/common/core/util/wmpt"
17+
"go.uber.org/zap"
1418
"gorm.io/gorm/clause"
1519

1620
"gorm.io/gorm"
@@ -357,3 +361,52 @@ type ReadPoolRedeem struct {
357361
PoolID string `json:"pool_id"` // read pool ID
358362
Balance int64 `json:"balance"` // balance reduction
359363
}
364+
365+
func (a *Allocation) recoverTrie() error {
366+
367+
trie := wmpt.New(nil, datastore.GetBlockStore())
368+
//fetch all the files of the allocation to rebuild the trie
369+
var offsetPath string
370+
for {
371+
var (
372+
pRefs *[]reference.PaginatedRef
373+
err error
374+
)
375+
err = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
376+
pRefs, _, offsetPath, err = reference.GetRefs(ctx, a.ID, "/", offsetPath, reference.FILE, 0, 100)
377+
return err
378+
})
379+
if err != nil {
380+
logging.Logger.Error("recover_trie_fetch_refs", zap.Error(err))
381+
return err
382+
}
383+
if len(*pRefs) == 0 {
384+
break
385+
}
386+
trie.SaveRoot()
387+
for _, ref := range *pRefs {
388+
decodedKey, _ := hex.DecodeString(ref.LookupHash)
389+
decodedValue, _ := hex.DecodeString(ref.FileMetaHash)
390+
err = trie.Update(decodedKey, decodedValue, uint64(ref.NumBlocks))
391+
if err != nil {
392+
logging.Logger.Error("recover_trie_update", zap.Error(err))
393+
return err
394+
}
395+
}
396+
batcher, err := trie.Commit(filestore.COLLAPSE_DEPTH)
397+
if err != nil {
398+
logging.Logger.Error("recover_trie_commit", zap.Error(err))
399+
return err
400+
}
401+
err = batcher.Commit(true)
402+
if err != nil {
403+
logging.Logger.Error("recover_trie_batcher_commit", zap.Error(err))
404+
return err
405+
}
406+
_ = trie.DeleteNodes()
407+
if len(*pRefs) < 100 {
408+
break
409+
}
410+
}
411+
return nil
412+
}

code/go/0chain.net/blobbercore/allocation/workers.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,3 +348,34 @@ func deleteAllocation(ctx context.Context, a *Allocation) (err error) {
348348
a.ID).Error
349349
return err
350350
}
351+
352+
func RecoverTrie() {
353+
var (
354+
allocs []*Allocation
355+
err error
356+
offset int64
357+
)
358+
359+
for {
360+
err = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
361+
allocs, err = Repo.GetAllocations(ctx, offset)
362+
return err
363+
})
364+
if err != nil {
365+
logging.Logger.Error("recover_trie_fetch_alloc", zap.Error(err))
366+
return
367+
}
368+
if len(allocs) == 0 {
369+
return
370+
}
371+
offset += int64(len(allocs))
372+
// recover trie of each allocation
373+
for _, a := range allocs {
374+
logging.Logger.Info("recover_trie", zap.String("allocation_id", a.ID))
375+
err = a.recoverTrie()
376+
if err != nil {
377+
logging.Logger.Error("recover_trie", zap.Error(err))
378+
}
379+
}
380+
}
381+
}

0 commit comments

Comments
 (0)