Skip to content

Commit 4322eed

Browse files
authored
Merge pull request #92 from Azure/dev
v.0.6.12
2 parents 83d99b5 + 8989b63 commit 4322eed

28 files changed

+1598
-637
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@
22
*.o
33
*.a
44
*.so
5+
__*.*
56

67
# Folders
78
_obj
89
_test
10+
_build/
911
_build/linux_amd64
1012
_build/windows_amd64
1113
_wd
1214
_old
1315
.vscode
16+
*_testdata
1417

1518
# Architecture specific extensions/prefixes
1619
*.[568vq]

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ Sources and targets are decoupled, this design enables the composition of variou
2929
Download, extract and set permissions:
3030

3131
```bash
32-
wget -O bp_linux.tar.gz https://github.com/Azure/blobporter/releases/download/v0.6.09/bp_linux.tar.gz
32+
wget -O bp_linux.tar.gz https://github.com/Azure/blobporter/releases/download/v0.6.12/bp_linux.tar.gz
3333
tar -xvf bp_linux.tar.gz linux_amd64/blobporter
3434
chmod +x ~/linux_amd64/blobporter
3535
cd ~/linux_amd64
@@ -46,7 +46,7 @@ export ACCOUNT_KEY=<STORAGE_ACCOUNT_KEY>
4646
4747
### Windows
4848

49-
Download [BlobPorter.exe](https://github.com/Azure/blobporter/releases/download/v0.6.09/bp_windows.zip)
49+
Download [BlobPorter.exe](https://github.com/Azure/blobporter/releases/download/v0.6.12/bp_windows.zip)
5050

5151
Set environment variables (if using the command prompt):
5252

args.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ type arguments struct {
7272
readTokenExp int
7373
numberOfHandlesPerFile int //numberOfHandlesPerFile = defaultNumberOfHandlesPerFile
7474
numberOfFilesInBatch int //numberOfFilesInBatch = defaultNumberOfFilesInBatch
75+
transferStatusPath string
7576
}
7677

7778
//represents validated parameters
@@ -95,6 +96,7 @@ type validatedParameters struct {
9596
blobSource blobParams
9697
blobTarget blobParams
9798
perfSourceDefinitions []sources.SourceDefinition
99+
tracker *internal.TransferTracker
98100
}
99101

100102
type s3Source struct {
@@ -166,12 +168,14 @@ func (p *paramParserValidator) parseAndValidate() error {
166168
p.targetSegment = t
167169
err = p.runParseAndValidationRules(
168170
p.pvgCalculateReadersAndWorkers,
171+
p.pvgTransferStatusPathIsPresent,
169172
p.pvgBatchLimits,
170173
p.pvgHTTPTimeOut,
171174
p.pvgDupCheck,
172175
p.pvgParseBlockSize,
173176
p.pvgQuietMode,
174-
p.pvgKeepDirectoryStructure)
177+
p.pvgKeepDirectoryStructure,
178+
p.pvgUseExactMatch)
175179

176180
if err != nil {
177181
return err
@@ -254,6 +258,27 @@ func (p *paramParserValidator) getSourceRules() ([]parseAndValidationRule, error
254258
//**************************
255259

256260
//Global rules....
261+
func (p *paramParserValidator) pvgUseExactMatch() error {
262+
p.params.useExactMatch = p.args.exactNameMatch
263+
return nil
264+
}
265+
266+
func (p *paramParserValidator) pvgTransferStatusPathIsPresent() error {
267+
268+
if p.args.transferStatusPath != "" {
269+
if !p.args.quietMode{
270+
fmt.Printf("Transfer is resumable. Transfer status file:%v \n", p.args.transferStatusPath)
271+
}
272+
tracker, err := internal.NewTransferTracker(p.args.transferStatusPath)
273+
274+
if err != nil {
275+
return err
276+
}
277+
278+
p.params.tracker = tracker
279+
}
280+
return nil
281+
}
257282
func (p *paramParserValidator) pvgKeepDirectoryStructure() error {
258283
p.params.keepDirStructure = !p.args.removeDirStructure
259284
return nil
@@ -503,7 +528,7 @@ func (p *paramParserValidator) pvSourceInfoForS3IsReq() error {
503528
burl, err := url.Parse(p.params.sourceURIs[0])
504529

505530
if err != nil {
506-
return fmt.Errorf("Invalid S3 endpoint URL. Parsing error: %v.\nThe format is s3://[END_POINT]/[BUCKET]/[OBJECT]", err)
531+
return fmt.Errorf("Invalid S3 endpoint URL. Parsing error: %v.\nThe format is s3://[END_POINT]/[BUCKET]/[PREFIX]", err)
507532
}
508533

509534
p.params.s3Source.endpoint = burl.Hostname()
@@ -514,10 +539,14 @@ func (p *paramParserValidator) pvSourceInfoForS3IsReq() error {
514539

515540
segments := strings.Split(burl.Path, "/")
516541

542+
if len(segments) < 2 {
543+
return fmt.Errorf("Invalid S3 endpoint URL. Bucket not specified. The format is s3://[END_POINT]/[BUCKET]/[PREFIX]")
544+
}
545+
517546
p.params.s3Source.bucket = segments[1]
518547

519548
if p.params.s3Source.bucket == "" {
520-
return fmt.Errorf("Invalid source S3 URI. Bucket name could be parsed")
549+
return fmt.Errorf("Invalid source S3 URI. Bucket name could not be parsed")
521550
}
522551

523552
prefix := ""

blobporter.go

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,12 @@ import (
1010
"strconv"
1111
"sync/atomic"
1212

13+
"github.com/Azure/blobporter/internal"
1314
"github.com/Azure/blobporter/pipeline"
1415
"github.com/Azure/blobporter/transfer"
1516
"github.com/Azure/blobporter/util"
16-
"github.com/Azure/blobporter/internal"
1717
)
1818

19-
2019
var argsUtil paramParserValidator
2120

2221
func init() {
@@ -46,6 +45,7 @@ func init() {
4645
numberOfHandlersPerFileMsg = "Number of open handles for concurrent reads and writes per file."
4746
numberOfFilesInBatchMsg = "Maximum number of files in a transfer.\n\tIf the number is exceeded new transfers are created"
4847
readTokenExpMsg = "Expiration in minutes of the read-only access token that will be generated to read from S3 or Azure Blob sources."
48+
transferStatusFileMsg = "Transfer status file location. If set, blobporter will use this file to track the status of the transfer.\n\tIn case of failure and if the option is set the same status file, source files that were transferred will be skipped.\n\tIf the transfer is successful a summary will be appended."
4949
)
5050

5151
flag.Usage = func() {
@@ -68,6 +68,7 @@ func init() {
6868
util.PrintUsageDefaults("h", "handles_per_file", strconv.Itoa(argsUtil.args.numberOfHandlesPerFile), numberOfHandlersPerFileMsg)
6969
util.PrintUsageDefaults("x", "files_per_transfer", strconv.Itoa(argsUtil.args.numberOfFilesInBatch), numberOfFilesInBatchMsg)
7070
util.PrintUsageDefaults("o", "read_token_exp", strconv.Itoa(defaultReadTokenExp), readTokenExpMsg)
71+
util.PrintUsageDefaults("l", "transfer_status", "", transferStatusFileMsg)
7172
}
7273

7374
util.StringListVarAlias(&argsUtil.args.sourceURIs, "f", "source_file", "", fileMsg)
@@ -89,39 +90,35 @@ func init() {
8990
util.IntVarAlias(&argsUtil.args.numberOfHandlesPerFile, "h", "handles_per_file", defaultNumberOfHandlesPerFile, numberOfHandlersPerFileMsg)
9091
util.IntVarAlias(&argsUtil.args.numberOfFilesInBatch, "x", "files_per_transfer", defaultNumberOfFilesInBatch, numberOfFilesInBatchMsg)
9192
util.IntVarAlias(&argsUtil.args.readTokenExp, "o", "read_token_exp", defaultReadTokenExp, readTokenExpMsg)
92-
93+
util.StringVarAlias(&argsUtil.args.transferStatusPath, "l", "transfer_status", "", transferStatusFileMsg)
9394
}
9495

9596
var dataTransferred uint64
9697
var targetRetries int32
9798

98-
func displayFilesToTransfer(sourcesInfo []pipeline.SourceInfo, numOfBatches int, batchNumber int) {
99-
if numOfBatches == 1 {
100-
fmt.Printf("Files to Transfer (%v) :\n", argsUtil.params.transferType)
101-
var totalSize uint64
102-
summary := ""
103-
104-
for _, source := range sourcesInfo {
105-
//if the source is URL, remove the QS
106-
display := source.SourceName
107-
if u, err := url.Parse(source.SourceName); err == nil {
108-
display = fmt.Sprintf("%v%v", u.Hostname(), u.Path)
109-
}
110-
summary = summary + fmt.Sprintf("Source: %v Size:%v \n", display, source.Size)
111-
totalSize = totalSize + source.Size
112-
}
99+
func displayFilesToTransfer(sourcesInfo []pipeline.SourceInfo) {
100+
fmt.Printf("\nFiles to Transfer (%v) :\n", argsUtil.params.transferType)
101+
var totalSize uint64
102+
summary := ""
113103

114-
if len(sourcesInfo) < 10 {
115-
fmt.Printf(summary)
116-
return
104+
for _, source := range sourcesInfo {
105+
//if the source is URL, remove the QS
106+
display := source.SourceName
107+
if u, err := url.Parse(source.SourceName); err == nil {
108+
display = fmt.Sprintf("%v%v", u.Hostname(), u.Path)
117109
}
110+
summary = summary + fmt.Sprintf("Source: %v Size:%v \n", display, source.Size)
111+
totalSize = totalSize + source.Size
112+
}
118113

119-
fmt.Printf("%v files. Total size:%v\n", len(sourcesInfo), totalSize)
120-
114+
if len(sourcesInfo) < 10 {
115+
fmt.Printf(summary)
121116
return
122117
}
123118

124-
fmt.Printf("\nBatch transfer (%v).\nFiles per Batch: %v.\nBatch: %v of %v\n ", argsUtil.params.transferType, len(sourcesInfo), batchNumber+1, numOfBatches)
119+
fmt.Printf("%v files. Total size:%v\n", len(sourcesInfo), totalSize)
120+
121+
return
125122
}
126123

127124
func main() {
@@ -141,21 +138,32 @@ func main() {
141138

142139
stats := transfer.NewStats(argsUtil.params.numberOfWorkers, argsUtil.params.numberOfReaders)
143140

144-
for b, sourcePipeline := range sourcePipelines {
145-
sourcesInfo := sourcePipeline.GetSourcesInfo()
141+
for sourcePipeline := range sourcePipelines {
142+
143+
if sourcePipeline.Err != nil {
144+
log.Fatal(sourcePipeline.Err)
145+
}
146+
147+
sourcesInfo := sourcePipeline.Source.GetSourcesInfo()
146148

147-
tfer := transfer.NewTransfer(&sourcePipeline, &targetPipeline, argsUtil.params.numberOfReaders, argsUtil.params.numberOfWorkers, argsUtil.params.blockSize)
149+
tfer := transfer.NewTransfer(sourcePipeline.Source, targetPipeline, argsUtil.params.numberOfReaders, argsUtil.params.numberOfWorkers, argsUtil.params.blockSize)
150+
tfer.SetTransferTracker(argsUtil.params.tracker)
148151

149-
displayFilesToTransfer(sourcesInfo, len(sourcePipelines), b)
152+
displayFilesToTransfer(sourcesInfo)
150153
pb := getProgressBarDelegate(tfer.TotalSize, argsUtil.params.quietMode)
151154

152155
tfer.StartTransfer(argsUtil.params.dedupeLevel, pb)
153-
154156
tfer.WaitForCompletion()
155157

156158
stats.AddTransferInfo(tfer.GetStats())
157159
}
158160

161+
if argsUtil.params.tracker != nil {
162+
if err = argsUtil.params.tracker.TrackTransferComplete(); err != nil {
163+
log.Fatal(err)
164+
}
165+
}
166+
159167
stats.DisplaySummary()
160168

161169
}

docs/bptransfer.png

46.3 KB
Loading

docs/conf.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515
# import os
1616
# import sys
1717
# sys.path.insert(0, os.path.abspath('.'))
18-
18+
import sphinx_bootstrap_theme
1919

2020
# -- Project information -----------------------------------------------------
2121

2222
project = u'BlobPorter'
23-
#copyright = u'2018, Jesus Aguilar'
23+
copyright = u'2018, BlobPorter Contributors'
2424
author = u'BlobPorter Contributors'
2525

2626
# The short X.Y version
@@ -74,7 +74,9 @@
7474
# The theme to use for HTML and HTML Help pages. See the documentation for
7575
# a list of builtin themes.
7676
#
77-
html_theme = 'alabaster'
77+
html_theme = 'sphinx_rtd_theme'
78+
#html_theme = 'bootstrap'
79+
#html_theme_path = sphinx_bootstrap_theme.get_html_theme_path()
7880

7981
# Theme options are theme-specific and customize the look and feel of a theme
8082
# further. For a list of options available for each theme, see the

0 commit comments

Comments
 (0)