Skip to content

Commit 400281c

Browse files
committed
- transfers now resumable
- file stats are read async to define the shape of the transfer. - refactoring.
1 parent a64d3ce commit 400281c

17 files changed

+840
-386
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
*.o
33
*.a
44
*.so
5+
__*.*
56

67
# Folders
78
_obj
@@ -11,6 +12,7 @@ _build/windows_amd64
1112
_wd
1213
_old
1314
.vscode
15+
*_testdata
1416

1517
# Architecture specific extensions/prefixes
1618
*.[568vq]

args.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ type arguments struct {
7272
readTokenExp int
7373
numberOfHandlesPerFile int //numberOfHandlesPerFile = defaultNumberOfHandlesPerFile
7474
numberOfFilesInBatch int //numberOfFilesInBatch = defaultNumberOfFilesInBatch
75+
transferStatusPath string
7576
}
7677

7778
//represents validated parameters
@@ -95,6 +96,7 @@ type validatedParameters struct {
9596
blobSource blobParams
9697
blobTarget blobParams
9798
perfSourceDefinitions []sources.SourceDefinition
99+
tracker *internal.TransferTracker
98100
}
99101

100102
type s3Source struct {
@@ -166,6 +168,7 @@ func (p *paramParserValidator) parseAndValidate() error {
166168
p.targetSegment = t
167169
err = p.runParseAndValidationRules(
168170
p.pvgCalculateReadersAndWorkers,
171+
p.pvgTransferStatusPathIsPresent,
169172
p.pvgBatchLimits,
170173
p.pvgHTTPTimeOut,
171174
p.pvgDupCheck,
@@ -254,6 +257,22 @@ func (p *paramParserValidator) getSourceRules() ([]parseAndValidationRule, error
254257
//**************************
255258

256259
//Global rules....
260+
func (p *paramParserValidator) pvgTransferStatusPathIsPresent() error {
261+
262+
if p.args.transferStatusPath != "" {
263+
if !p.args.quietMode{
264+
fmt.Printf("Transfer is resumable. Transfer status file:%v \n", p.args.transferStatusPath)
265+
}
266+
tracker, err := internal.NewTransferTracker(p.args.transferStatusPath)
267+
268+
if err != nil {
269+
return err
270+
}
271+
272+
p.params.tracker = tracker
273+
}
274+
return nil
275+
}
257276
func (p *paramParserValidator) pvgKeepDirectoryStructure() error {
258277
p.params.keepDirStructure = !p.args.removeDirStructure
259278
return nil

blobporter.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ func init() {
4545
numberOfHandlersPerFileMsg = "Number of open handles for concurrent reads and writes per file."
4646
numberOfFilesInBatchMsg = "Maximum number of files in a transfer.\n\tIf the number is exceeded new transfers are created"
4747
readTokenExpMsg = "Expiration in minutes of the read-only access token that will be generated to read from S3 or Azure Blob sources."
48+
transferStatusFileMsg = "Transfer status file location. If set, blobporter will use this file to track the status of the transfer.\n\tIn case of failure and if the option is set the same status file, source files that were transferred will be skipped.\n\tIf the transfer is successful a summary will be created at then."
4849
)
4950

5051
flag.Usage = func() {
@@ -67,6 +68,7 @@ func init() {
6768
util.PrintUsageDefaults("h", "handles_per_file", strconv.Itoa(argsUtil.args.numberOfHandlesPerFile), numberOfHandlersPerFileMsg)
6869
util.PrintUsageDefaults("x", "files_per_transfer", strconv.Itoa(argsUtil.args.numberOfFilesInBatch), numberOfFilesInBatchMsg)
6970
util.PrintUsageDefaults("o", "read_token_exp", strconv.Itoa(defaultReadTokenExp), readTokenExpMsg)
71+
util.PrintUsageDefaults("l", "transfer_status", "", transferStatusFileMsg)
7072
}
7173

7274
util.StringListVarAlias(&argsUtil.args.sourceURIs, "f", "source_file", "", fileMsg)
@@ -88,7 +90,7 @@ func init() {
8890
util.IntVarAlias(&argsUtil.args.numberOfHandlesPerFile, "h", "handles_per_file", defaultNumberOfHandlesPerFile, numberOfHandlersPerFileMsg)
8991
util.IntVarAlias(&argsUtil.args.numberOfFilesInBatch, "x", "files_per_transfer", defaultNumberOfFilesInBatch, numberOfFilesInBatchMsg)
9092
util.IntVarAlias(&argsUtil.args.readTokenExp, "o", "read_token_exp", defaultReadTokenExp, readTokenExpMsg)
91-
93+
util.StringVarAlias(&argsUtil.args.transferStatusPath, "l", "transfer_status", "", transferStatusFileMsg)
9294
}
9395

9496
var dataTransferred uint64
@@ -144,18 +146,24 @@ func main() {
144146

145147
sourcesInfo := sourcePipeline.Source.GetSourcesInfo()
146148

147-
tfer := transfer.NewTransfer(&sourcePipeline.Source, &targetPipeline, argsUtil.params.numberOfReaders, argsUtil.params.numberOfWorkers, argsUtil.params.blockSize)
149+
tfer := transfer.NewTransfer(sourcePipeline.Source, targetPipeline, argsUtil.params.numberOfReaders, argsUtil.params.numberOfWorkers, argsUtil.params.blockSize)
150+
tfer.SetTransferTracker(argsUtil.params.tracker)
148151

149152
displayFilesToTransfer(sourcesInfo)
150153
pb := getProgressBarDelegate(tfer.TotalSize, argsUtil.params.quietMode)
151154

152155
tfer.StartTransfer(argsUtil.params.dedupeLevel, pb)
153-
154156
tfer.WaitForCompletion()
155157

156158
stats.AddTransferInfo(tfer.GetStats())
157159
}
158160

161+
if argsUtil.params.tracker != nil {
162+
if err = argsUtil.params.tracker.TrackTransferComplete(); err != nil {
163+
log.Fatal(err)
164+
}
165+
}
166+
159167
stats.DisplaySummary()
160168

161169
}

internal/const.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
)
77

88
//ProgramVersion blobporter version
9-
const ProgramVersion = "0.6.10"
9+
const ProgramVersion = "0.6.11"
1010

1111
//HTTPClientTimeout HTTP client timeout when reading from HTTP sources and try timeout for blob storage operations.
1212
var HTTPClientTimeout = 90

0 commit comments

Comments
 (0)