Skip to content

Commit 50b88ac

Browse files
committed
0.6.20rc
1 parent eb11447 commit 50b88ac

File tree

156 files changed

+20382
-13499
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

156 files changed

+20382
-13499
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ _build/linux_amd64
1212
_build/windows_amd64
1313
_wd
1414
_old
15+
_vendor*
1516
.vscode
1617
*_testdata
1718

.travis.yml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
sudo: required
22
language: go
33
go:
4-
- 1.9
4+
- 1.11
55
before_install:
66
- sudo apt-get install zip
7-
- go get github.com/tools/godep
7+
# Download the binary to bin folder in $GOPATH
8+
- curl -L -s https://github.com/golang/dep/releases/download/v${DEP_VERSION}/dep-linux-amd64 -o $GOPATH/bin/dep
9+
# Make the binary executable
10+
- chmod +x $GOPATH/bin/dep
811
- go get -d github.com/stretchr/testify/assert
912
install:
10-
- godep restore
13+
# - dep ensure
1114
script:
1215
- go test -v ./...
1316
- mkdir linux_amd64

Gopkg.lock

Lines changed: 122 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Gopkg.toml

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Gopkg.toml example
2+
#
3+
# Refer to https://golang.github.io/dep/docs/Gopkg.toml.html
4+
# for detailed Gopkg.toml documentation.
5+
#
6+
# required = ["github.com/user/thing/cmd/thing"]
7+
# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
8+
#
9+
# [[constraint]]
10+
# name = "github.com/user/project"
11+
# version = "1.0.0"
12+
#
13+
# [[constraint]]
14+
# name = "github.com/user/project2"
15+
# branch = "dev"
16+
# source = "github.com/myfork/project2"
17+
#
18+
# [[override]]
19+
# name = "github.com/x/y"
20+
# version = "2.4.0"
21+
#
22+
# [prune]
23+
# non-go = false
24+
# go-tests = true
25+
# unused-packages = true
26+
27+
28+
[[constraint]]
29+
name = "github.com/Azure/azure-pipeline-go"
30+
version = "0.1.7"
31+
32+
[[constraint]]
33+
branch = "master"
34+
name = "github.com/Azure/azure-storage-blob-go"
35+
36+
[[constraint]]
37+
name = "github.com/minio/minio-go"
38+
version = "4.0.6"
39+
40+
[[constraint]]
41+
name = "github.com/stretchr/testify"
42+
version = "1.2.2"
43+
44+
[prune]
45+
go-tests = true
46+
unused-packages = true

args.go

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ type paramParserValidator struct {
4343
params *validatedParameters
4444
sourceSegment transfer.TransferSegment
4545
targetSegment transfer.TransferSegment
46+
defaultValues defaults
47+
}
48+
49+
type defaults struct {
50+
defaultNumberOfReaders int
51+
defaultNumberOfWorkers int
4652
}
4753

4854
//respresents raw (not validated) the list of options/flags as received from the user
@@ -96,6 +102,7 @@ type validatedParameters struct {
96102
blobTarget blobParams
97103
perfSourceDefinitions []sources.SourceDefinition
98104
tracker *internal.TransferTracker
105+
referenceMode bool
99106
}
100107

101108
type s3Source struct {
@@ -108,12 +115,13 @@ type s3Source struct {
108115
}
109116

110117
type blobParams struct {
111-
accountName string
112-
accountKey string
113-
container string
114-
prefixes []string
115-
baseBlobURL string
116-
sasExpMin int
118+
accountName string
119+
accountKey string
120+
container string
121+
prefixes []string
122+
baseBlobURL string
123+
sasExpMin int
124+
useServerSide bool
117125
}
118126

119127
func (b blobParams) isSourceAuthAndContainerInfoSet() bool {
@@ -143,21 +151,28 @@ func newParamParserValidator() paramParserValidator {
143151
transferDefStr: string(transfer.FileToBlock),
144152
numberOfHandlesPerFile: defaultNumberOfHandlesPerFile,
145153
hTTPClientTimeout: defaultHTTPClientTimeout,
146-
numberOfFilesInBatch: defaultNumberOfFilesInBatch}
154+
numberOfFilesInBatch: defaultNumberOfFilesInBatch,
155+
}
147156
params := &validatedParameters{
148157
s3Source: s3Source{
149158
preSignedExpMin: defaultReadTokenExp},
150159
blobSource: blobParams{
151160
sasExpMin: defaultReadTokenExp},
152161
blobTarget: blobParams{}}
153162

154-
p := paramParserValidator{args: args, params: params}
163+
p := paramParserValidator{args: args,
164+
params: params,
165+
defaultValues: defaults{
166+
defaultNumberOfReaders: defaultNumberOfReaders,
167+
defaultNumberOfWorkers: defaultNumberOfWorkers,
168+
},
169+
}
155170

156171
return p
157172
}
158173

159174
func (p *paramParserValidator) parseAndValidate() error {
160-
//Run global rules.. this will set the transfer type
175+
//Run global rules.. this will set the transfer type which is needed in some of the global rules.
161176
err := p.pvgTransferType()
162177
if err != nil {
163178
return err
@@ -166,6 +181,7 @@ func (p *paramParserValidator) parseAndValidate() error {
166181
p.sourceSegment = s
167182
p.targetSegment = t
168183
err = p.runParseAndValidationRules(
184+
p.pvgSetReferenceModeAndServerSide,
169185
p.pvgCalculateReadersAndWorkers,
170186
p.pvgTransferStatusPathIsPresent,
171187
p.pvgBatchLimits,
@@ -180,7 +196,7 @@ func (p *paramParserValidator) parseAndValidate() error {
180196
return err
181197
}
182198

183-
//get and run target rules...
199+
//get and run source and target rules...
184200
var rules []parseAndValidationRule
185201
if rules, err = p.getSourceRules(); err != nil {
186202
return err
@@ -260,6 +276,16 @@ func (p *paramParserValidator) getSourceRules() ([]parseAndValidationRule, error
260276
//**************************
261277

262278
//Global rules....
279+
func (p *paramParserValidator) pvgSetReferenceModeAndServerSide() error {
280+
if p.targetSegment == transfer.BlockBlob {
281+
if p.sourceSegment == transfer.Blob {
282+
p.params.blobTarget.useServerSide = true
283+
p.params.referenceMode = true
284+
}
285+
}
286+
return nil
287+
}
288+
263289
func (p *paramParserValidator) pvgUseExactMatch() error {
264290
p.params.useExactMatch = p.args.exactNameMatch
265291
return nil
@@ -302,6 +328,23 @@ func (p *paramParserValidator) pvgCalculateReadersAndWorkers() error {
302328
p.params.numberOfWorkers = p.args.numberOfWorkers
303329
p.params.numberOfReaders = p.args.numberOfReaders
304330

331+
//if using the default parameters and the transfer is blob-blockblob change
332+
// params to maximes the transfer considering that this will be server-side sync
333+
if p.defaultValues.defaultNumberOfReaders == p.params.numberOfReaders &&
334+
p.defaultValues.defaultNumberOfWorkers == p.params.numberOfWorkers {
335+
336+
if p.params.transferType == transfer.BlobToBlock {
337+
workers := 75 * runtime.NumCPU()
338+
339+
if workers > 200 {
340+
workers = 200
341+
}
342+
p.params.numberOfWorkers = workers
343+
p.params.numberOfReaders = 4
344+
}
345+
346+
}
347+
305348
return nil
306349

307350
}

blobporter.go

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -131,34 +131,29 @@ func main() {
131131
if err := argsUtil.parseAndValidate(); err != nil {
132132
log.Fatal(err)
133133
}
134-
135134
//Create pipelines
136135
sourcePipelines, targetPipeline, err := newTransferPipelines(argsUtil.params)
137136

138137
if err != nil {
139138
log.Fatal(err)
140139
}
141140

142-
stats := transfer.NewStats(argsUtil.params.numberOfWorkers, argsUtil.params.numberOfReaders)
143-
141+
prog := newProgressState(argsUtil.params.quietMode, argsUtil.params.numberOfReaders, argsUtil.params.numberOfWorkers)
144142
for sourcePipeline := range sourcePipelines {
145143

146144
if sourcePipeline.Err != nil {
147145
log.Fatal(sourcePipeline.Err)
148146
}
149147

150148
sourcesInfo := sourcePipeline.Source.GetSourcesInfo()
151-
152149
tfer := transfer.NewTransfer(sourcePipeline.Source, targetPipeline, argsUtil.params.numberOfReaders, argsUtil.params.numberOfWorkers, argsUtil.params.blockSize)
153150
tfer.SetTransferTracker(argsUtil.params.tracker)
154-
155-
displayFilesToTransfer(sourcesInfo)
156-
pb := getProgressBarDelegate(tfer.TotalSize, argsUtil.params.quietMode)
157-
158-
tfer.StartTransfer(argsUtil.params.dedupeLevel, pb)
151+
prog.newTransfer(float64(tfer.TotalSize), sourcesInfo, argsUtil.params.transferType)
152+
153+
tfer.StartTransfer(argsUtil.params.dedupeLevel)
159154
tfer.WaitForCompletion()
160155

161-
stats.AddTransferInfo(tfer.GetStats())
156+
162157
}
163158

164159
if argsUtil.params.tracker != nil {
@@ -167,24 +162,25 @@ func main() {
167162
}
168163
}
169164

170-
stats.DisplaySummary()
165+
prog.displayGlobalSummary()
171166

167+
172168
}
173169

174170
func getProgressBarDelegate(totalSize uint64, quietMode bool) func(r pipeline.WorkerResult, committedCount int, bufferLevel int) {
175171
dataTransferred = 0
176172
targetRetries = 0
177173
if quietMode || totalSize == 0 {
178174
return func(r pipeline.WorkerResult, committedCount int, bufferLevel int) {
179-
atomic.AddInt32(&targetRetries, int32(r.Stats.Retries))
180175
dataTransferred = dataTransferred + uint64(r.BlockSize)
181176
}
182177

183178
}
184179
return func(r pipeline.WorkerResult, committedCount int, bufferLevel int) {
185180

186-
atomic.AddInt32(&targetRetries, int32(r.Stats.Retries))
187-
181+
if r.Stats != nil {
182+
atomic.AddInt32(&targetRetries, int32(r.Stats.Retries))
183+
}
188184
dataTransferred = dataTransferred + uint64(r.BlockSize)
189185
p := int(math.Ceil((float64(dataTransferred) / float64(totalSize)) * 100))
190186
var ind string

0 commit comments

Comments
 (0)