Skip to content

Commit c2bd391

Browse files
authored
Merge pull request #1394 from 0chain/sprint-1.13
Sprint 1.13
2 parents dc9d4c4 + ebbb3d9 commit c2bd391

23 files changed

+571
-429
lines changed

code/go/0chain.net/blobbercore/allocation/allocationchange.go

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"github.com/0chain/blobber/code/go/0chain.net/core/common"
1515
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
1616
"github.com/0chain/gosdk/constants"
17-
"github.com/remeh/sizedwaitgroup"
17+
"golang.org/x/sync/errgroup"
1818

1919
"go.uber.org/zap"
2020
"gorm.io/gorm"
@@ -236,35 +236,19 @@ func (cc *AllocationChangeCollector) ApplyChanges(ctx context.Context, allocatio
236236
}
237237

238238
func (a *AllocationChangeCollector) CommitToFileStore(ctx context.Context) error {
239-
commitCtx, cancel := context.WithCancel(ctx)
240-
defer cancel()
241-
// Can be configured at runtime, this number will depend on the number of active allocations
242-
swg := sizedwaitgroup.New(5)
239+
// Limit can be configured at runtime, this number will depend on the number of active allocations
240+
eg, _ := errgroup.WithContext(ctx)
241+
eg.SetLimit(5)
243242
mut := &sync.Mutex{}
244-
var (
245-
commitError error
246-
errorMutex sync.Mutex
247-
)
248243
for _, change := range a.AllocationChanges {
249-
select {
250-
case <-commitCtx.Done():
251-
return fmt.Errorf("commit to filestore failed: %s", commitError.Error())
252-
default:
253-
}
254-
swg.Add()
255-
go func(change AllocationChangeProcessor) {
256-
err := change.CommitToFileStore(ctx, mut)
257-
if err != nil && !errors.Is(common.ErrFileWasDeleted, err) {
258-
cancel()
259-
errorMutex.Lock()
260-
commitError = err
261-
errorMutex.Unlock()
262-
}
263-
swg.Done()
264-
}(change)
244+
allocChange := change
245+
eg.Go(func() error {
246+
return allocChange.CommitToFileStore(ctx, mut)
247+
})
265248
}
266-
swg.Wait()
267-
return commitError
249+
logging.Logger.Info("Waiting for commit to filestore", zap.String("allocation_id", a.AllocationID))
250+
251+
return eg.Wait()
268252
}
269253

270254
func (a *AllocationChangeCollector) DeleteChanges(ctx context.Context) {

code/go/0chain.net/blobbercore/allocation/common_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ func (mfs *MockFileStore) WriteFile(allocID, connID string,
2929
}, nil
3030
}
3131

32+
func (mfs *MockFileStore) WriteDataToTree(allocID, connID, fileName, filePathHash string, hasher *filestore.CommitHasher) error {
33+
return nil
34+
}
35+
3236
func (mfs *MockFileStore) CommitWrite(allocID, connID string, fileData *filestore.FileInputData) (bool, error) {
3337
return true, nil
3438
}
@@ -64,6 +68,10 @@ func (mfs *MockFileStore) GetFilePathSize(allocID, contentHash, thumbHash string
6468
return 0, 0, nil
6569
}
6670

71+
func (mfs *MockFileStore) GetTempFilePath(allocID, connID, fileName, filePathHash string) string {
72+
return ""
73+
}
74+
6775
func (mfs *MockFileStore) GetBlocksMerkleTreeForChallenge(cir *filestore.ChallengeReadBlockInput) (*filestore.ChallengeResponse, error) {
6876
return nil, nil
6977
}

0 commit comments

Comments
 (0)