@@ -5,14 +5,17 @@ import (
5
5
"encoding/json"
6
6
"errors"
7
7
"fmt"
8
+ "strconv"
8
9
"sync"
10
+ "sync/atomic"
9
11
"time"
10
12
11
13
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
12
14
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
13
15
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
14
16
"github.com/0chain/blobber/code/go/0chain.net/core/common"
15
17
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
18
+ "github.com/0chain/common/core/util/wmpt"
16
19
"github.com/0chain/gosdk/constants"
17
20
"golang.org/x/sync/errgroup"
18
21
@@ -34,6 +37,7 @@ type AllocationChangeProcessor interface {
34
37
DeleteTempFile () error
35
38
ApplyChange (ctx context.Context , rootRef * reference.Ref , change * AllocationChange , allocationRoot string ,
36
39
ts common.Timestamp , fileIDMeta map [string ]string ) (* reference.Ref , error )
40
+ ApplyChangeV2 (ctx context.Context , allocationRoot , clientPubKey string , numFiles * atomic.Int32 , ts common.Timestamp , trie * wmpt.WeightedMerkleTrie , collector reference.QueryCollector ) (int64 , error )
37
41
GetPath () []string
38
42
Marshal () (string , error )
39
43
Unmarshal (string ) error
@@ -74,6 +78,7 @@ type AllocationChange struct {
74
78
Input string `gorm:"column:input"`
75
79
FilePath string `gorm:"-"`
76
80
LookupHash string `gorm:"column:lookup_hash;size:64"`
81
+ AllocationID string `gorm:"-" json:"-"`
77
82
datastore.ModelWithTS
78
83
}
79
84
@@ -275,10 +280,47 @@ func (cc *AllocationChangeCollector) ApplyChanges(ctx context.Context, allocatio
275
280
if err != nil {
276
281
return rootRef , err
277
282
}
278
- err = collector .Finalize (ctx )
283
+ //Ignore ref cache for version 1 of storage
284
+ err = collector .Finalize (ctx , "" , "" )
279
285
return rootRef , err
280
286
}
281
287
288
+ func (cc * AllocationChangeCollector ) ApplyChangesV2 (ctx context.Context , allocationRoot , clientPubKey string , numFiles * atomic.Int32 , maxFileChange int32 , ts common.Timestamp , trie * wmpt.WeightedMerkleTrie ) error {
289
+ now := time .Now ()
290
+ collector := reference .NewCollector (len (cc .Changes ))
291
+ eg , _ := errgroup .WithContext (context .TODO ())
292
+ eg .SetLimit (10 )
293
+ cc .Size = 0
294
+ for idx , change := range cc .Changes {
295
+ change .AllocationID = cc .AllocationID
296
+ changeIndex := idx
297
+ eg .Go (func () error {
298
+ changeProcessor := cc .AllocationChanges [changeIndex ]
299
+ sizeChange , err := changeProcessor .ApplyChangeV2 (ctx , allocationRoot , clientPubKey , numFiles , ts , trie , collector )
300
+ if err != nil {
301
+ logging .Logger .Error ("ApplyChangesV2Error" , zap .Error (err ))
302
+ return err
303
+ }
304
+ atomic .AddInt64 (& cc .Size , sizeChange )
305
+ return nil
306
+ })
307
+
308
+ }
309
+ err := eg .Wait ()
310
+ if err != nil {
311
+ logging .Logger .Error ("ApplyChangesV2" , zap .Error (err ))
312
+ return err
313
+ }
314
+ if numFiles .Load () > int32 (maxFileChange ) {
315
+ return common .NewError ("max_file_change" , "Max file change exceeded " + strconv .Itoa (int (maxFileChange )))
316
+ }
317
+ elapsedApplyChanges := time .Since (now )
318
+ err = collector .Finalize (ctx , cc .AllocationID , allocationRoot )
319
+ elapsedFinalize := time .Since (now ) - elapsedApplyChanges
320
+ logging .Logger .Info ("ApplyChangesV2" , zap .String ("allocation_id" , cc .AllocationID ), zap .Duration ("elapsed_apply_changes" , elapsedApplyChanges ), zap .Duration ("elapsed_finalize" , elapsedFinalize ), zap .Int ("changes" , len (cc .Changes )))
321
+ return err
322
+ }
323
+
282
324
func (a * AllocationChangeCollector ) CommitToFileStore (ctx context.Context ) error {
283
325
// Limit can be configured at runtime, this number will depend on the number of active allocations
284
326
eg , _ := errgroup .WithContext (ctx )
@@ -313,10 +355,30 @@ type Result struct {
313
355
}
314
356
315
357
// TODO: Need to speed up this function
316
- func (a * AllocationChangeCollector ) MoveToFilestore (ctx context.Context ) error {
358
+ func (a * AllocationChangeCollector ) MoveToFilestore (ctx context.Context , allocationObj * Allocation ) error {
317
359
318
360
logging .Logger .Info ("Move to filestore" , zap .String ("allocation_id" , a .AllocationID ))
319
- err := deleteFromFileStore (ctx , a .AllocationID )
361
+ if allocationObj .IsRedeemRequired {
362
+ err := datastore .GetStore ().WithNewTransaction (func (ctx context.Context ) error {
363
+ allocationObj .IsRedeemRequired = false
364
+
365
+ updateMap := map [string ]interface {}{
366
+ "is_redeem_required" : false ,
367
+ }
368
+ updateOption := func (a * Allocation ) {
369
+ a .IsRedeemRequired = false
370
+ }
371
+
372
+ if err := Repo .UpdateAllocation (ctx , allocationObj , updateMap , updateOption ); err != nil {
373
+ return common .NewError ("allocation_write_error" , "Error persisting the allocation object" )
374
+ }
375
+ return nil
376
+ })
377
+ if err != nil {
378
+ return err
379
+ }
380
+ }
381
+ err := deleteFromFileStore (a .AllocationID )
320
382
if err != nil {
321
383
return err
322
384
}
@@ -393,7 +455,7 @@ func (a *AllocationChangeCollector) MoveToFilestore(ctx context.Context) error {
393
455
return err
394
456
}
395
457
396
- func deleteFromFileStore (ctx context. Context , allocationID string ) error {
458
+ func deleteFromFileStore (allocationID string ) error {
397
459
limitCh := make (chan struct {}, 10 )
398
460
wg := & sync.WaitGroup {}
399
461
var results []Result
@@ -460,6 +522,140 @@ func deleteFromFileStore(ctx context.Context, allocationID string) error {
460
522
})
461
523
}
462
524
525
+ func (a * AllocationChangeCollector ) MoveToFilestoreV2 (ctx context.Context , allocationObj * Allocation , allocationRoot string ) error {
526
+ logging .Logger .Info ("Move to filestore v2" , zap .String ("allocation_id" , a .AllocationID ))
527
+ now := time .Now ()
528
+ if allocationObj .IsRedeemRequired {
529
+ err := datastore .GetStore ().WithNewTransaction (func (ctx context.Context ) error {
530
+ allocationObj .IsRedeemRequired = false
531
+
532
+ updateMap := map [string ]interface {}{
533
+ "is_redeem_required" : false ,
534
+ }
535
+ updateOption := func (a * Allocation ) {
536
+ a .IsRedeemRequired = false
537
+ }
538
+ tx := datastore .GetStore ().GetTransaction (ctx )
539
+ if err := tx .Exec ("SET LOCAL synchronous_commit TO OFF" ).Error ; err != nil {
540
+ return err
541
+ }
542
+
543
+ if err := Repo .UpdateAllocation (ctx , allocationObj , updateMap , updateOption ); err != nil {
544
+ return common .NewError ("allocation_write_error" , "Error persisting the allocation object" )
545
+ }
546
+ return nil
547
+ })
548
+ if err != nil {
549
+ return err
550
+ }
551
+ }
552
+ elapsedUpdateAllocation := time .Since (now )
553
+
554
+ var (
555
+ refs []* reference.Ref
556
+ useRefCache bool
557
+ deletedRefs []* reference.Ref
558
+ )
559
+ refCache := reference .GetRefCache (a .AllocationID )
560
+ defer reference .DeleteRefCache (a .AllocationID )
561
+ if refCache != nil && refCache .AllocationRoot == allocationRoot {
562
+ useRefCache = true
563
+ refs = refCache .CreatedRefs
564
+ deletedRefs = refCache .DeletedRefs
565
+ } else if refCache != nil && refCache .AllocationRoot != allocationRoot {
566
+ logging .Logger .Error ("Ref cache is not valid" , zap .String ("allocation_id" , a .AllocationID ), zap .String ("ref_cache_root" , refCache .AllocationRoot ), zap .String ("allocation_root" , allocationRoot ))
567
+ } else {
568
+ logging .Logger .Error ("Ref cache is nil" , zap .String ("allocation_id" , a .AllocationID ))
569
+ }
570
+ err := deleteFromFileStoreV2 (a .AllocationID , deletedRefs , useRefCache )
571
+ if err != nil {
572
+ return err
573
+ }
574
+ elapsedDeleteFromFilestore := time .Since (now ) - elapsedUpdateAllocation
575
+
576
+ limitCh := make (chan struct {}, 12 )
577
+ wg := & sync.WaitGroup {}
578
+ if ! useRefCache {
579
+ tx := datastore .GetStore ().GetTransaction (ctx )
580
+ err = tx .Model (& reference.Ref {}).Select ("lookup_hash" ).Where ("allocation_id=? AND allocation_root=? AND type=?" , a .AllocationID , allocationRoot , reference .FILE ).Find (& refs ).Error
581
+ if err != nil {
582
+ logging .Logger .Error ("Error while moving files to filestore" , zap .Error (err ))
583
+ return err
584
+ }
585
+ }
586
+
587
+ for _ , ref := range refs {
588
+
589
+ limitCh <- struct {}{}
590
+ wg .Add (1 )
591
+ refLookupHash := ref .LookupHash
592
+ go func () {
593
+ defer func () {
594
+ <- limitCh
595
+ wg .Done ()
596
+ }()
597
+ err := filestore .GetFileStore ().MoveToFilestore (a .AllocationID , refLookupHash , filestore .VERSION )
598
+ if err != nil {
599
+ logging .Logger .Error (fmt .Sprintf ("Error while moving file: %s" , err .Error ()))
600
+ }
601
+
602
+ }()
603
+ }
604
+
605
+ wg .Wait ()
606
+ elapsedMove := time .Since (now ) - elapsedUpdateAllocation - elapsedDeleteFromFilestore
607
+ logging .Logger .Info ("moveToFilestoreV2" , zap .Duration ("elapsedAllocation" , elapsedUpdateAllocation ), zap .Duration ("elapsedDelete" , elapsedDeleteFromFilestore ), zap .Duration ("elapsedMove" , elapsedMove ), zap .Duration ("elapsedTotal" , time .Since (now )), zap .Bool ("useRefCache" , useRefCache ), zap .Int ("createRefs" , len (refs )), zap .Int ("deleteRefs" , len (deletedRefs )))
608
+ return nil
609
+ }
610
+
611
+ func deleteFromFileStoreV2 (allocationID string , deletedRefs []* reference.Ref , useRefCache bool ) error {
612
+ limitCh := make (chan struct {}, 12 )
613
+ wg := & sync.WaitGroup {}
614
+ var results []* reference.Ref
615
+ if useRefCache {
616
+ results = deletedRefs
617
+ }
618
+
619
+ return datastore .GetStore ().WithNewTransaction (func (ctx context.Context ) error {
620
+ db := datastore .GetStore ().GetTransaction (ctx )
621
+ if ! useRefCache {
622
+ err := db .Model (& reference.Ref {}).Unscoped ().Select ("lookup_hash" ).
623
+ Where ("allocation_id=? AND type=? AND deleted_at is not NULL" , allocationID , reference .FILE ).
624
+ Find (& results ).Error
625
+ if err != nil && err != gorm .ErrRecordNotFound {
626
+ logging .Logger .Error ("DeleteFromFileStore" , zap .Error (err ))
627
+ return err
628
+ }
629
+ }
630
+
631
+ for _ , res := range results {
632
+ limitCh <- struct {}{}
633
+ wg .Add (1 )
634
+ resLookupHash := res .LookupHash
635
+ go func () {
636
+ defer func () {
637
+ <- limitCh
638
+ wg .Done ()
639
+ }()
640
+
641
+ err := filestore .GetFileStore ().DeleteFromFilestore (allocationID , resLookupHash ,
642
+ filestore .VERSION )
643
+ if err != nil {
644
+ logging .Logger .Error (fmt .Sprintf ("Error while deleting file: %s" , err .Error ()),
645
+ zap .String ("validation_root" , resLookupHash ))
646
+ }
647
+ }()
648
+
649
+ }
650
+ wg .Wait ()
651
+
652
+ return db .Model (& reference.Ref {}).Unscoped ().
653
+ Delete (& reference.Ref {},
654
+ "allocation_id = ? AND deleted_at IS NOT NULL" ,
655
+ allocationID ).Error
656
+ })
657
+ }
658
+
463
659
// Note: We are also fetching refPath for srcPath in copy operation
464
660
func (a * AllocationChangeCollector ) GetRootRef (ctx context.Context ) (* reference.Ref , error ) {
465
661
paths := make ([]string , 0 )
0 commit comments