Skip to content

Commit a64d3ce

Browse files
committed
- Listing of source objects no longer blocks the transfer.
- S3 folders are not counted in the batch totals. - Dial connection errors are retried. - TCP connections are closed for small requests.
1 parent 83d99b5 commit a64d3ce

File tree

14 files changed

+417
-282
lines changed

14 files changed

+417
-282
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ Sources and targets are decoupled, this design enables the composition of variou
2929
Download, extract and set permissions:
3030

3131
```bash
32-
wget -O bp_linux.tar.gz https://github.com/Azure/blobporter/releases/download/v0.6.09/bp_linux.tar.gz
32+
wget -O bp_linux.tar.gz https://github.com/Azure/blobporter/releases/download/v0.6.10/bp_linux.tar.gz
3333
tar -xvf bp_linux.tar.gz linux_amd64/blobporter
3434
chmod +x ~/linux_amd64/blobporter
3535
cd ~/linux_amd64
@@ -46,7 +46,7 @@ export ACCOUNT_KEY=<STORAGE_ACCOUNT_KEY>
4646
4747
### Windows
4848

49-
Download [BlobPorter.exe](https://github.com/Azure/blobporter/releases/download/v0.6.09/bp_windows.zip)
49+
Download [BlobPorter.exe](https://github.com/Azure/blobporter/releases/download/v0.6.10/bp_windows.zip)
5050

5151
Set environment variables (if using the command prompt):
5252

blobporter.go

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,12 @@ import (
1010
"strconv"
1111
"sync/atomic"
1212

13+
"github.com/Azure/blobporter/internal"
1314
"github.com/Azure/blobporter/pipeline"
1415
"github.com/Azure/blobporter/transfer"
1516
"github.com/Azure/blobporter/util"
16-
"github.com/Azure/blobporter/internal"
1717
)
1818

19-
2019
var argsUtil paramParserValidator
2120

2221
func init() {
@@ -95,33 +94,29 @@ func init() {
9594
var dataTransferred uint64
9695
var targetRetries int32
9796

98-
func displayFilesToTransfer(sourcesInfo []pipeline.SourceInfo, numOfBatches int, batchNumber int) {
99-
if numOfBatches == 1 {
100-
fmt.Printf("Files to Transfer (%v) :\n", argsUtil.params.transferType)
101-
var totalSize uint64
102-
summary := ""
103-
104-
for _, source := range sourcesInfo {
105-
//if the source is URL, remove the QS
106-
display := source.SourceName
107-
if u, err := url.Parse(source.SourceName); err == nil {
108-
display = fmt.Sprintf("%v%v", u.Hostname(), u.Path)
109-
}
110-
summary = summary + fmt.Sprintf("Source: %v Size:%v \n", display, source.Size)
111-
totalSize = totalSize + source.Size
112-
}
97+
func displayFilesToTransfer(sourcesInfo []pipeline.SourceInfo) {
98+
fmt.Printf("\nFiles to Transfer (%v) :\n", argsUtil.params.transferType)
99+
var totalSize uint64
100+
summary := ""
113101

114-
if len(sourcesInfo) < 10 {
115-
fmt.Printf(summary)
116-
return
102+
for _, source := range sourcesInfo {
103+
//if the source is URL, remove the QS
104+
display := source.SourceName
105+
if u, err := url.Parse(source.SourceName); err == nil {
106+
display = fmt.Sprintf("%v%v", u.Hostname(), u.Path)
117107
}
108+
summary = summary + fmt.Sprintf("Source: %v Size:%v \n", display, source.Size)
109+
totalSize = totalSize + source.Size
110+
}
118111

119-
fmt.Printf("%v files. Total size:%v\n", len(sourcesInfo), totalSize)
120-
112+
if len(sourcesInfo) < 10 {
113+
fmt.Printf(summary)
121114
return
122115
}
123116

124-
fmt.Printf("\nBatch transfer (%v).\nFiles per Batch: %v.\nBatch: %v of %v\n ", argsUtil.params.transferType, len(sourcesInfo), batchNumber+1, numOfBatches)
117+
fmt.Printf("%v files. Total size:%v\n", len(sourcesInfo), totalSize)
118+
119+
return
125120
}
126121

127122
func main() {
@@ -141,12 +136,17 @@ func main() {
141136

142137
stats := transfer.NewStats(argsUtil.params.numberOfWorkers, argsUtil.params.numberOfReaders)
143138

144-
for b, sourcePipeline := range sourcePipelines {
145-
sourcesInfo := sourcePipeline.GetSourcesInfo()
139+
for sourcePipeline := range sourcePipelines {
140+
141+
if sourcePipeline.Err != nil {
142+
log.Fatal(sourcePipeline.Err)
143+
}
144+
145+
sourcesInfo := sourcePipeline.Source.GetSourcesInfo()
146146

147-
tfer := transfer.NewTransfer(&sourcePipeline, &targetPipeline, argsUtil.params.numberOfReaders, argsUtil.params.numberOfWorkers, argsUtil.params.blockSize)
147+
tfer := transfer.NewTransfer(&sourcePipeline.Source, &targetPipeline, argsUtil.params.numberOfReaders, argsUtil.params.numberOfWorkers, argsUtil.params.blockSize)
148148

149-
displayFilesToTransfer(sourcesInfo, len(sourcePipelines), b)
149+
displayFilesToTransfer(sourcesInfo)
150150
pb := getProgressBarDelegate(tfer.TotalSize, argsUtil.params.quietMode)
151151

152152
tfer.StartTransfer(argsUtil.params.dedupeLevel, pb)

internal/azutil.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"os"
1212
"syscall"
1313
"time"
14-
14+
"github.com/Azure/blobporter/util"
1515
"github.com/Azure/azure-pipeline-go/pipeline"
1616

1717
"github.com/Azure/azure-storage-blob-go/2016-05-31/azblob"
@@ -190,6 +190,7 @@ func (p *AzUtil) PutBlockBlob(blobName string, body io.ReadSeeker, md5 []byte) e
190190

191191
h := azblob.BlobHTTPHeaders{}
192192

193+
193194
//16 is md5.Size
194195
if len(md5) != 16 {
195196
var md5bytes [16]byte
@@ -375,6 +376,24 @@ func isWinsockTimeOutError(err error) net.Error {
375376
return nil
376377
}
377378

379+
func isDialConnectError(err error) net.Error {
380+
if uerr, ok := err.(*url.Error); ok {
381+
if derr, ok := uerr.Err.(*net.OpError); ok {
382+
if serr, ok := derr.Err.(*os.SyscallError); ok && serr.Syscall == "connect" {
383+
return &retriableError{error: err}
384+
}
385+
}
386+
}
387+
return nil
388+
}
389+
390+
func isRetriableDialError(err error) net.Error {
391+
if derr := isWinsockTimeOutError(err); derr != nil {
392+
return derr
393+
}
394+
return isDialConnectError(err)
395+
}
396+
378397
type retriableError struct {
379398
error
380399
}
@@ -387,11 +406,19 @@ func (*retriableError) Temporary() bool {
387406
return true
388407
}
389408

409+
const tcpKeepOpenMinLength = 8 * int64(util.MB)
410+
390411
func (p *clientPolicy) Do(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
391-
r, err := pipelineHTTPClient.Do(request.WithContext(ctx))
412+
req := request.WithContext(ctx)
413+
414+
if req.ContentLength < tcpKeepOpenMinLength {
415+
req.Close=true
416+
}
417+
418+
r, err := pipelineHTTPClient.Do(req)
392419
pipresp := pipeline.NewHTTPResponse(r)
393420
if err != nil {
394-
if derr := isWinsockTimeOutError(err); derr != nil {
421+
if derr := isRetriableDialError(err); derr != nil {
395422
return pipresp, derr
396423
}
397424
err = pipeline.NewError(err, "HTTP request failed")
@@ -411,9 +438,9 @@ func newpipelineHTTPClient() *http.Client {
411438
KeepAlive: 30 * time.Second,
412439
DualStack: true,
413440
}).Dial,
414-
MaxIdleConns: 0,
441+
MaxIdleConns: 100,
415442
MaxIdleConnsPerHost: 100,
416-
IdleConnTimeout: 90 * time.Second,
443+
IdleConnTimeout: 60 * time.Second,
417444
TLSHandshakeTimeout: 10 * time.Second,
418445
ExpectContinueTimeout: 1 * time.Second,
419446
DisableKeepAlives: false,

internal/const.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
)
77

88
//ProgramVersion blobporter version
9-
const ProgramVersion = "0.6.09"
9+
const ProgramVersion = "0.6.10"
1010

1111
//HTTPClientTimeout HTTP client timeout when reading from HTTP sources and try timeout for blob storage operations.
1212
var HTTPClientTimeout = 90

pipelinefactory.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@ import (
99
"github.com/Azure/blobporter/transfer"
1010
)
1111

12-
func newTransferPipelines(params *validatedParameters) ([]pipeline.SourcePipeline, pipeline.TargetPipeline, error) {
12+
func newTransferPipelines(params *validatedParameters) (<-chan sources.FactoryResult, pipeline.TargetPipeline, error) {
1313

1414
fact := newPipelinesFactory(params)
1515

16-
var sourcesp []pipeline.SourcePipeline
16+
var sourcesp <-chan sources.FactoryResult
1717
var targetp pipeline.TargetPipeline
1818
var err error
1919

@@ -72,7 +72,7 @@ func (p *pipelinesFactory) newTargetPipeline() (pipeline.TargetPipeline, error)
7272
return nil, fmt.Errorf("Invalid target segment:%v", p.target)
7373
}
7474

75-
func (p *pipelinesFactory) newSourcePipelines() ([]pipeline.SourcePipeline, error) {
75+
func (p *pipelinesFactory) newSourcePipelines() (<-chan sources.FactoryResult, error) {
7676

7777
params, err := p.newSourceParams()
7878

@@ -83,18 +83,19 @@ func (p *pipelinesFactory) newSourcePipelines() ([]pipeline.SourcePipeline, erro
8383
switch p.source {
8484
case transfer.File:
8585
params := params.(sources.FileSystemSourceParams)
86-
return sources.NewFileSystemSourcePipeline(&params), nil
86+
return sources.NewFileSystemSourcePipelineFactory(&params), nil
8787
case transfer.HTTP:
8888
params := params.(sources.HTTPSourceParams)
89-
return []pipeline.SourcePipeline{sources.NewHTTPSourcePipeline(params.SourceURIs, params.TargetAliases, params.SourceParams.CalculateMD5)}, nil
89+
return sources.NewHTTPSourcePipelineFactory(params), nil
9090
case transfer.S3:
9191
params := params.(sources.S3Params)
92-
return sources.NewS3SourcePipeline(&params), nil
92+
return sources.NewS3SourcePipelineFactory(&params), nil
9393
case transfer.Blob:
9494
params := params.(sources.AzureBlobParams)
95-
return sources.NewAzureBlobSourcePipeline(&params), nil
95+
return sources.NewAzBlobSourcePipelineFactory(&params), nil
9696
case transfer.Perf:
97-
return sources.NewPerfSourcePipeline(params.(sources.PerfSourceParams)), nil
97+
params := params.(sources.PerfSourceParams)
98+
return sources.NewPerfSourcePipelineFactory(params), nil
9899
}
99100

100101
return nil, fmt.Errorf("Invalid source segment:%v", p.source)

sources/azblobinfo.go

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package sources
22

33
import (
4-
"fmt"
54
"log"
65
"path"
76
"time"
@@ -39,35 +38,52 @@ func newazBlobInfoProvider(params *AzureBlobParams) *azBlobInfoProvider {
3938
return &azBlobInfoProvider{params: params, azUtil: azutil}
4039
}
4140

42-
//getSourceInfo gets a list of SourceInfo that represent the list of azure blobs returned by the service
43-
// based on the provided criteria (container/prefix). If the exact match flag is set, then a specific match is
44-
// performed instead of the prefix. Marker semantics are also honored so a complete list is expected
45-
func (b *azBlobInfoProvider) getSourceInfo() ([]pipeline.SourceInfo, error) {
46-
var err error
41+
func (b *azBlobInfoProvider) toSourceInfo(obj *azblob.Blob) (*pipeline.SourceInfo, error) {
4742
exp := b.params.SasExp
4843
if exp == 0 {
4944
exp = defaultSasExpHours
5045
}
5146
date := time.Now().Add(time.Duration(exp) * time.Minute).UTC()
52-
sourceURIs := make([]pipeline.SourceInfo, 0)
47+
48+
sourceURLWithSAS := b.azUtil.GetBlobURLWithReadOnlySASToken(obj.Name, date)
49+
50+
targetAlias := obj.Name
51+
if !b.params.KeepDirStructure {
52+
targetAlias = path.Base(obj.Name)
53+
}
54+
55+
return &pipeline.SourceInfo{
56+
SourceName: sourceURLWithSAS.String(),
57+
Size: uint64(*obj.Properties.ContentLength),
58+
TargetAlias: targetAlias}, nil
59+
}
60+
61+
func (b *azBlobInfoProvider) listObjects(filter SourceFilter) <-chan ObjectListingResult {
62+
sources := make(chan ObjectListingResult, 2)
63+
list := make([]pipeline.SourceInfo, 0)
64+
bsize := 0
5365

5466
blobCallback := func(blob *azblob.Blob, prefix string) (bool, error) {
5567
include := true
5668
if b.params.UseExactNameMatch {
5769
include = blob.Name == prefix
5870
}
59-
if include {
60-
sourceURLWithSAS := b.azUtil.GetBlobURLWithReadOnlySASToken(blob.Name, date)
71+
if include && filter.IsIncluded(blob.Name) {
72+
73+
si, err := b.toSourceInfo(blob)
74+
75+
if err != nil {
76+
return true, err
77+
}
78+
79+
list = append(list, *si)
6180

62-
targetAlias := blob.Name
63-
if !b.params.KeepDirStructure {
64-
targetAlias = path.Base(blob.Name)
81+
if bsize++; bsize == b.params.FilesPerPipeline {
82+
sources <- ObjectListingResult{Sources: list}
83+
list = make([]pipeline.SourceInfo, 0)
84+
bsize = 0
6585
}
6686

67-
sourceURIs = append(sourceURIs, pipeline.SourceInfo{
68-
SourceName: sourceURLWithSAS.String(),
69-
Size: uint64(*blob.Properties.ContentLength),
70-
TargetAlias: targetAlias})
7187
if b.params.UseExactNameMatch {
7288
//true, stops iteration
7389
return true, nil
@@ -78,19 +94,21 @@ func (b *azBlobInfoProvider) getSourceInfo() ([]pipeline.SourceInfo, error) {
7894
return false, nil
7995
}
8096

81-
for _, blobName := range b.params.BlobNames {
82-
if err = b.azUtil.IterateBlobList(blobName, blobCallback); err != nil {
83-
return nil, err
97+
go func() {
98+
for _, blobName := range b.params.BlobNames {
99+
if err := b.azUtil.IterateBlobList(blobName, blobCallback); err != nil {
100+
sources <- ObjectListingResult{Err: err}
101+
return
102+
}
103+
if bsize > 0 {
104+
sources <- ObjectListingResult{Sources: list}
105+
list = make([]pipeline.SourceInfo, 0)
106+
bsize = 0
107+
}
84108
}
85-
}
109+
close(sources)
86110

87-
if len(sourceURIs) == 0 {
88-
nameMatchMode := "prefix"
89-
if b.params.UseExactNameMatch {
90-
nameMatchMode = "name"
91-
}
92-
return nil, fmt.Errorf(" the %v %s did not match any blob names ", nameMatchMode, b.params.BlobNames)
93-
}
111+
}()
94112

95-
return sourceURIs, nil
113+
return sources
96114
}

0 commit comments

Comments
 (0)