Skip to content

Commit ebbb3d9

Browse files
authored
Merge pull request #1392 from 0chain/fix/upload-resume
Fix resume upload
2 parents 5b5fb10 + 264f6c4 commit ebbb3d9

File tree

5 files changed

+21
-13
lines changed

5 files changed

+21
-13
lines changed

code/go/0chain.net/blobbercore/allocation/connection.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,6 @@ func SaveFileChange(connectionID, pathHash, fileName string, cmd FileCommand, is
193193
hasher := filestore.GetNewCommitHasher(contentSize)
194194
change.hasher = hasher
195195
change.seqPQ = seqpriorityqueue.NewSeqPriorityQueue(contentSize)
196-
hasher.WG.Add(1)
197196
go hasher.Start(connectionObj.ctx, connectionID, connectionObj.AllocationID, fileName, pathHash, change.seqPQ)
198197
saveChange = true
199198
}
@@ -203,6 +202,7 @@ func SaveFileChange(connectionID, pathHash, fileName string, cmd FileCommand, is
203202
if change.isFinalized {
204203
return false, nil
205204
}
205+
206206
if isFinal {
207207
change.isFinalized = true
208208
change.seqPQ.Done(seqpriorityqueue.UploadData{

code/go/0chain.net/blobbercore/filestore/storage.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,9 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
276276
nodeSie := getNodesSize(fileData.Size, util.MaxMerkleLeavesSize)
277277
fileSize := rStat.Size() - nodeSie - FMTSize
278278
now := time.Now()
279-
err = fileData.Hasher.Wait(conID, allocID, fileData.Name, filePathHash)
279+
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
280+
defer cancel()
281+
err = fileData.Hasher.Wait(ctx, conID, allocID, fileData.Name, filePathHash)
280282
if err != nil {
281283
return false, common.NewError("hasher_wait_error", err.Error())
282284
}

code/go/0chain.net/blobbercore/filestore/store_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,6 @@ func TestStoreStorageWriteAndCommit(t *testing.T) {
273273
tF, err := os.Stat(tempFilePath)
274274
require.Nil(t, err)
275275
seqPQ := seqpriorityqueue.NewSeqPriorityQueue(int64(size))
276-
hasher.WG.Add(1)
277276
go hasher.Start(context.TODO(), test.connID, test.allocID, test.fileName, pathHash, seqPQ)
278277
seqPQ.Done(seqpriorityqueue.UploadData{
279278
Offset: 0,
@@ -365,7 +364,6 @@ func TestDeletePreCommitDir(t *testing.T) {
365364
nodeSize := getNodesSize(int64(size), util.MaxMerkleLeavesSize)
366365
require.Equal(t, int64(size), tF.Size()-nodeSize-FMTSize)
367366
seqPQ := seqpriorityqueue.NewSeqPriorityQueue(int64(size))
368-
hasher.WG.Add(1)
369367
go hasher.Start(context.TODO(), connID, allocID, fileName, pathHash, seqPQ)
370368
seqPQ.Done(seqpriorityqueue.UploadData{
371369
Offset: 0,
@@ -401,7 +399,6 @@ func TestDeletePreCommitDir(t *testing.T) {
401399
_, err = os.Stat(tempFilePath)
402400
require.Nil(t, err)
403401
seqPQ = seqpriorityqueue.NewSeqPriorityQueue(int64(size))
404-
hasher.WG.Add(1)
405402
go hasher.Start(context.TODO(), connID, allocID, fileName, pathHash, seqPQ)
406403
seqPQ.Done(seqpriorityqueue.UploadData{
407404
Offset: 0,
@@ -476,7 +473,6 @@ func TestStorageUploadUpdate(t *testing.T) {
476473
nodeSize := getNodesSize(int64(size), util.MaxMerkleLeavesSize)
477474
require.Equal(t, int64(size), tF.Size()-nodeSize-FMTSize)
478475
seqPQ := seqpriorityqueue.NewSeqPriorityQueue(int64(size))
479-
hasher.WG.Add(1)
480476
go hasher.Start(context.TODO(), connID, allocID, fileName, pathHash, seqPQ)
481477
seqPQ.Done(seqpriorityqueue.UploadData{
482478
Offset: 0,

code/go/0chain.net/blobbercore/filestore/tree_validation.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ type CommitHasher struct {
409409
fmt *fixedMerkleTree
410410
vt *validationTree
411411
isInitialized bool
412-
WG sync.WaitGroup
412+
doneChan chan struct{}
413413
hashErr error
414414
dataSize int64
415415
}
@@ -419,12 +419,13 @@ func GetNewCommitHasher(dataSize int64) *CommitHasher {
419419
c.fmt = getNewFixedMerkleTree()
420420
c.vt = getNewValidationTree(dataSize)
421421
c.isInitialized = true
422+
c.doneChan = make(chan struct{})
422423
c.dataSize = dataSize
423424
return c
424425
}
425426

426427
func (c *CommitHasher) Start(ctx context.Context, connID, allocID, fileName, filePathHash string, seqPQ *seqpriorityqueue.SeqPriorityQueue) {
427-
defer c.WG.Done()
428+
defer close(c.doneChan)
428429
tempFilePath := GetFileStore().GetTempFilePath(allocID, connID, fileName, filePathHash)
429430
f, err := os.Open(tempFilePath)
430431
if err != nil {
@@ -453,6 +454,8 @@ func (c *CommitHasher) Start(ctx context.Context, connID, allocID, fileName, fil
453454
default:
454455
}
455456
toFinalize = true
457+
} else if pq.DataBytes == 0 {
458+
continue
456459
}
457460
logging.Logger.Info("hasher_pop", zap.Int64("offset", pq.Offset), zap.Int64("dataBytes", pq.DataBytes), zap.Any("toFinalize", toFinalize), zap.Int64("dataSize", c.dataSize), zap.String("filename", fileName), zap.Int64("totalWritten", totalWritten))
458461
bufSize := 2 * BufferSize
@@ -487,9 +490,13 @@ func (c *CommitHasher) Start(ctx context.Context, connID, allocID, fileName, fil
487490
}
488491
}
489492

490-
func (c *CommitHasher) Wait(connID, allocID, fileName, filePathHash string) error {
491-
c.WG.Wait()
492-
return c.hashErr
493+
func (c *CommitHasher) Wait(ctx context.Context, connID, allocID, fileName, filePathHash string) error {
494+
select {
495+
case <-c.doneChan:
496+
return c.hashErr
497+
case <-ctx.Done():
498+
return ctx.Err()
499+
}
493500
}
494501

495502
func (c *CommitHasher) Write(b []byte) (int, error) {

code/go/0chain.net/blobbercore/seqpriorityqueue/seqpriorityqueue.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (pq *SeqPriorityQueue) Done(v UploadData) {
7878

7979
func (pq *SeqPriorityQueue) Popup() UploadData {
8080
pq.lock.Lock()
81-
for pq.queue.Len() == 0 && !pq.done || (pq.queue.Len() > 0 && pq.queue[0].Offset != pq.next) {
81+
for pq.queue.Len() == 0 || (!pq.done && pq.queue[0].Offset > pq.next) {
8282
pq.cv.Wait()
8383
}
8484
if pq.done {
@@ -91,8 +91,11 @@ func (pq *SeqPriorityQueue) Popup() UploadData {
9191
retItem := UploadData{
9292
Offset: pq.next,
9393
}
94-
for pq.queue.Len() > 0 && pq.queue[0].Offset == pq.next {
94+
for pq.queue.Len() > 0 && pq.queue[0].Offset <= pq.next {
9595
item := heap.Pop(&pq.queue).(UploadData)
96+
if item.Offset < pq.next {
97+
continue
98+
}
9699
pq.next += item.DataBytes
97100
}
98101
retItem.DataBytes = pq.next - retItem.Offset

0 commit comments

Comments
 (0)