Skip to content

Commit ce7071e

Browse files
committed
- naming refactoring
1 parent 8d41569 commit ce7071e

File tree

10 files changed

+145
-149
lines changed

10 files changed

+145
-149
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,10 @@ By default files are downloaded to the same directory where you are running blob
208208

209209
- `i`, `--remove_directories` *bool* if set blobs are downloaded or uploaded without keeping the directory structure of the source. Not applicable when the source is a HTTP endpoint.
210210

211+
## Optimizing Transfers
212+
213+
BlobPorter has several configuration options that can be used to miximize performance for specific use cases.
214+
211215
## Performance Considerations
212216

213217
By default, BlobPorter creates 5 readers and 8 workers for each core on the computer. You can overwrite these values by using the options -r (number of readers) and -g (number of workers). When overriding these options there are few considerations:

args.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,7 @@ func (p *paramParserValidator) getTargetRules() ([]parseAndValidationRule, error
205205
p.pvBlockSizeCheckForBlockBlobs,
206206
p.pvTargetBlobAuthInfoIsReq}, nil
207207
case transfer.Perf:
208-
return []parseAndValidationRule{
209-
p.pvPerfSourceIsReq}, nil
208+
return []parseAndValidationRule{}, nil
210209
}
211210

212211
return nil, fmt.Errorf("Invalid target segment type: %v ", p.targetSegment)

pipelinefactory.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,11 @@ func (p *pipelinesFactory) newTargetPipeline() (pipeline.TargetPipeline, error)
6060
switch p.target {
6161
case transfer.File:
6262
params := params.(targets.FileTargetParams)
63-
return targets.NewMultiFile(params.Overwrite, params.NumberOfHandlesPerFile), nil
63+
return targets.NewFileSystemTargetPipeline(params.Overwrite, params.NumberOfHandlesPerFile), nil
6464
case transfer.BlockBlob:
65-
return targets.NewAzureBlockPipeline(params.(targets.AzureTargetParams)), nil
65+
return targets.NewAzureBlockTargetPipeline(params.(targets.AzureTargetParams)), nil
6666
case transfer.PageBlob:
67-
return targets.NewAzurePagePipeline(params.(targets.AzureTargetParams)), nil
67+
return targets.NewAzurePageTargetPipeline(params.(targets.AzureTargetParams)), nil
6868
case transfer.Perf:
6969
return targets.NewPerfTargetPipeline(), nil
7070
}
@@ -82,17 +82,17 @@ func (p *pipelinesFactory) newSourcePipelines() ([]pipeline.SourcePipeline, erro
8282

8383
switch p.source {
8484
case transfer.File:
85-
params := params.(sources.MultiFileParams)
86-
return sources.NewMultiFile(&params), nil
85+
params := params.(sources.FileSystemSourceParams)
86+
return sources.NewFileSystemSourcePipeline(&params), nil
8787
case transfer.HTTP:
8888
params := params.(sources.HTTPSourceParams)
89-
return []pipeline.SourcePipeline{sources.NewHTTP(params.SourceURIs, params.TargetAliases, params.SourceParams.CalculateMD5)}, nil
89+
return []pipeline.SourcePipeline{sources.NewHTTPSourcePipeline(params.SourceURIs, params.TargetAliases, params.SourceParams.CalculateMD5)}, nil
9090
case transfer.S3:
9191
params := params.(sources.S3Params)
92-
return sources.NewS3Pipeline(&params), nil
92+
return sources.NewS3SourcePipeline(&params), nil
9393
case transfer.Blob:
9494
params := params.(sources.AzureBlobParams)
95-
return sources.NewAzureBlob(&params), nil
95+
return sources.NewAzureBlobSourcePipeline(&params), nil
9696
case transfer.Perf:
9797
return sources.NewPerfSourcePipeline(params.(sources.PerfSourceParams)), nil
9898
}
@@ -103,7 +103,7 @@ func (p *pipelinesFactory) newSourceParams() (interface{}, error) {
103103

104104
switch p.source {
105105
case transfer.File:
106-
return sources.MultiFileParams{
106+
return sources.FileSystemSourceParams{
107107
SourcePatterns: p.valParams.sourceURIs,
108108
BlockSize: p.valParams.blockSize,
109109
TargetAliases: p.valParams.targetAliases,

sources/http.go

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ import (
2222

2323
const sasTokenNumberOfHours = 4
2424

25-
// HTTPPipeline constructs parts channel and implements data readers for file exposed via HTTP
26-
type HTTPPipeline struct {
25+
// HTTPSource constructs parts channel and implements data readers for file exposed via HTTP
26+
type HTTPSource struct {
2727
Sources []pipeline.SourceInfo
2828
HTTPClient *http.Client
2929
includeMD5 bool
3030
}
3131

32-
type sourceHTTPPipelineFactory func(httpSource HTTPPipeline) (pipeline.SourcePipeline, error)
32+
type sourceHTTPPipelineFactory func(httpSource HTTPSource) (pipeline.SourcePipeline, error)
3333

3434
func newHTTPSource(sourceListManager objectListManager, pipelineFactory sourceHTTPPipelineFactory, numOfFilePerPipeline int, includeMD5 bool) ([]pipeline.SourcePipeline, error) {
3535
var err error
@@ -57,7 +57,7 @@ func newHTTPSource(sourceListManager objectListManager, pipelineFactory sourceHT
5757
numOfFilesInBatch = filesSent
5858
}
5959

60-
httpSource := HTTPPipeline{Sources: sourceInfos[start : start+numOfFilesInBatch], HTTPClient: httpSourceHTTPClient, includeMD5: includeMD5}
60+
httpSource := HTTPSource{Sources: sourceInfos[start : start+numOfFilesInBatch], HTTPClient: httpSourceHTTPClient, includeMD5: includeMD5}
6161

6262
pipelines[b], err = pipelineFactory(httpSource)
6363

@@ -71,9 +71,9 @@ func newHTTPSource(sourceListManager objectListManager, pipelineFactory sourceHT
7171
return pipelines, err
7272
}
7373

74-
//NewHTTP creates a new instance of an HTTP source
74+
//NewHTTPSourcePipeline creates a new instance of an HTTP source
7575
//To get the file size, a HTTP HEAD request is issued and the Content-Length header is inspected.
76-
func NewHTTP(sourceURIs []string, targetAliases []string, md5 bool) pipeline.SourcePipeline {
76+
func NewHTTPSourcePipeline(sourceURIs []string, targetAliases []string, md5 bool) pipeline.SourcePipeline {
7777
setTargetAlias := len(sourceURIs) == len(targetAliases)
7878
sources := make([]pipeline.SourceInfo, len(sourceURIs))
7979
for i := 0; i < len(sourceURIs); i++ {
@@ -94,7 +94,7 @@ func NewHTTP(sourceURIs []string, targetAliases []string, md5 bool) pipeline.Sou
9494
TargetAlias: targetAlias,
9595
SourceName: sourceURIs[i]}
9696
}
97-
return &HTTPPipeline{Sources: sources, HTTPClient: httpSourceHTTPClient, includeMD5: md5}
97+
return &HTTPSource{Sources: sources, HTTPClient: httpSourceHTTPClient, includeMD5: md5}
9898
}
9999

100100
func getSourceSize(sourceURI string) (size int) {
@@ -163,14 +163,14 @@ func getSourceSizeFromByteRangeHeader(sourceURI string) (size int) {
163163

164164
//GetSourcesInfo implements GetSourcesInfo from the pipeline.SourcePipeline Interface.
165165
//Returns an array of pipeline.SourceInfo[] with the files URL, alias and size.
166-
func (f *HTTPPipeline) GetSourcesInfo() []pipeline.SourceInfo {
166+
func (f *HTTPSource) GetSourcesInfo() []pipeline.SourceInfo {
167167
return f.Sources
168168
}
169169

170170
//ExecuteReader implements ExecuteReader from the pipeline.SourcePipeline Interface.
171171
//For each part the reader makes a byte range request to the source
172172
// starting from the part's Offset to BytesToRead - 1 (zero based).
173-
func (f *HTTPPipeline) ExecuteReader(partitionsQ chan pipeline.PartsPartition, partsQ chan pipeline.Part, readPartsQ chan pipeline.Part, id int, wg *sync.WaitGroup) {
173+
func (f *HTTPSource) ExecuteReader(partitionsQ chan pipeline.PartsPartition, partsQ chan pipeline.Part, readPartsQ chan pipeline.Part, id int, wg *sync.WaitGroup) {
174174
var blocksHandled = 0
175175
var err error
176176
var req *http.Request
@@ -239,7 +239,7 @@ func (f *HTTPPipeline) ExecuteReader(partitionsQ chan pipeline.PartsPartition, p
239239

240240
//ConstructBlockInfoQueue implements GetSourcesInfo from the pipeline.SourcePipeline Interface.
241241
//Constructs the Part's channel arithmetically from the size of the sources.
242-
func (f *HTTPPipeline) ConstructBlockInfoQueue(blockSize uint64) (partitionsQ chan pipeline.PartsPartition, partsQ chan pipeline.Part, numOfBlocks int, size uint64) {
242+
func (f *HTTPSource) ConstructBlockInfoQueue(blockSize uint64) (partitionsQ chan pipeline.PartsPartition, partsQ chan pipeline.Part, numOfBlocks int, size uint64) {
243243
allParts := make([][]pipeline.Part, len(f.Sources))
244244
//disable memory buffer for parts (bufferQ == nil)
245245
var bufferQ chan []byte
@@ -274,23 +274,23 @@ const (
274274
maxIdleConnsPerHost = 50
275275
)
276276

277-
var httpSourceHTTPClient = newSourceHTTPClient()
277+
var httpSourceHTTPClient = newSourceHTTPClient()
278278

279279
func newSourceHTTPClient() *http.Client {
280-
return &http.Client{
281-
Timeout: time.Duration(util.HTTPClientTimeout) * time.Second,
282-
Transport: &http.Transport{
283-
Dial: (&net.Dialer{
284-
Timeout: 30 * time.Second, // dial timeout
285-
KeepAlive: 30 * time.Second,
286-
DualStack: true,
287-
}).Dial,
288-
MaxIdleConns: maxIdleConns,
289-
MaxIdleConnsPerHost: maxIdleConnsPerHost,
290-
IdleConnTimeout: 90 * time.Second,
291-
TLSHandshakeTimeout: 30 * time.Second,
292-
ExpectContinueTimeout: 1 * time.Second,
293-
DisableKeepAlives: false,
294-
DisableCompression: false,
295-
MaxResponseHeaderBytes: 0}}
280+
return &http.Client{
281+
Timeout: time.Duration(util.HTTPClientTimeout) * time.Second,
282+
Transport: &http.Transport{
283+
Dial: (&net.Dialer{
284+
Timeout: 30 * time.Second, // dial timeout
285+
KeepAlive: 30 * time.Second,
286+
DualStack: true,
287+
}).Dial,
288+
MaxIdleConns: maxIdleConns,
289+
MaxIdleConnsPerHost: maxIdleConnsPerHost,
290+
IdleConnTimeout: 90 * time.Second,
291+
TLSHandshakeTimeout: 30 * time.Second,
292+
ExpectContinueTimeout: 1 * time.Second,
293+
DisableKeepAlives: false,
294+
DisableCompression: false,
295+
MaxResponseHeaderBytes: 0}}
296296
}

sources/multifile.go

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ import (
1717
)
1818

1919
////////////////////////////////////////////////////////////
20-
///// MultiFilePipeline
20+
///// FileSystemSource
2121
////////////////////////////////////////////////////////////
2222

23-
// MultiFilePipeline Contructs blocks queue and implements data readers
24-
type MultiFilePipeline struct {
23+
// FileSystemSource Contructs blocks queue and implements data readers
24+
type FileSystemSource struct {
2525
filesInfo map[string]FileInfo
2626
totalNumberOfBlocks int
2727
totalSize uint64
@@ -39,8 +39,8 @@ type FileInfo struct {
3939
NumOfBlocks int
4040
}
4141

42-
//MultiFileParams parameters used to create a new instance of multi-file source pipeline
43-
type MultiFileParams struct {
42+
//FileSystemSourceParams parameters used to create a new instance of multi-file source pipeline
43+
type FileSystemSourceParams struct {
4444
SourcePatterns []string
4545
BlockSize uint64
4646
TargetAliases []string
@@ -50,11 +50,10 @@ type MultiFileParams struct {
5050
KeepDirStructure bool
5151
}
5252

53-
// NewMultiFile creates a new MultiFilePipeline.
54-
// If the sourcePattern results in a single file, the targetAlias, if set, will be used as the target name.
55-
// Otherwise the full original file name will be used instead.
56-
//sourcePatterns []string, blockSize uint64, targetAliases []string, numOfPartitions int, md5 bool
57-
func NewMultiFile(params *MultiFileParams) []pipeline.SourcePipeline {
53+
// NewFileSystemSourcePipeline creates a new MultiFilePipeline.
54+
// If the sourcePattern results in a single file and the targetAlias is set, the alias will be used as the target name.
55+
// Otherwise the full original file name will be used.
56+
func NewFileSystemSourcePipeline(params *FileSystemSourceParams) []pipeline.SourcePipeline {
5857
var files []string
5958
var err error
6059
//get files from patterns
@@ -148,7 +147,7 @@ func newMultiFilePipeline(files []string, targetAliases []string, blockSize uint
148147

149148
handlePool := internal.NewFileHandlePool(maxNumOfHandlesPerFile, internal.Read, false)
150149

151-
return &MultiFilePipeline{filesInfo: fileInfos,
150+
return &FileSystemSource{filesInfo: fileInfos,
152151
totalNumberOfBlocks: totalNumberOfBlocks,
153152
blockSize: blockSize,
154153
totalSize: totalSize,
@@ -161,7 +160,7 @@ func newMultiFilePipeline(files []string, targetAliases []string, blockSize uint
161160
//ExecuteReader implements ExecuteReader from the pipeline.SourcePipeline Interface.
162161
//For each file the reader will maintain a open handle from which data will be read.
163162
// This implementation uses partitions (group of parts that can be read sequentially).
164-
func (f *MultiFilePipeline) ExecuteReader(partitionsQ chan pipeline.PartsPartition, partsQ chan pipeline.Part, readPartsQ chan pipeline.Part, id int, wg *sync.WaitGroup) {
163+
func (f *FileSystemSource) ExecuteReader(partitionsQ chan pipeline.PartsPartition, partsQ chan pipeline.Part, readPartsQ chan pipeline.Part, id int, wg *sync.WaitGroup) {
165164
var err error
166165
var partition pipeline.PartsPartition
167166

@@ -230,7 +229,7 @@ func (f *MultiFilePipeline) ExecuteReader(partitionsQ chan pipeline.PartsPartiti
230229

231230
//GetSourcesInfo implements GetSourcesInfo from the pipeline.SourcePipeline Interface.
232231
//Returns an an array of SourceInfo with the name, alias and size of the files to be transferred.
233-
func (f *MultiFilePipeline) GetSourcesInfo() []pipeline.SourceInfo {
232+
func (f *FileSystemSource) GetSourcesInfo() []pipeline.SourceInfo {
234233

235234
sources := make([]pipeline.SourceInfo, len(f.filesInfo))
236235
var i = 0
@@ -272,7 +271,7 @@ func createPartsFromSource(size uint64, sourceNumOfBlocks int, blockSize uint64,
272271
//ConstructBlockInfoQueue implements ConstructBlockInfoQueue from the pipeline.SourcePipeline Interface.
273272
// this implementation uses partitions to group parts into a set that can be read sequentially.
274273
// This is to avoid Window's memory pressure when calling SetFilePointer numerous times on the same handle
275-
func (f *MultiFilePipeline) ConstructBlockInfoQueue(blockSize uint64) (partitionsQ chan pipeline.PartsPartition, partsQ chan pipeline.Part, numOfBlocks int, size uint64) {
274+
func (f *FileSystemSource) ConstructBlockInfoQueue(blockSize uint64) (partitionsQ chan pipeline.PartsPartition, partsQ chan pipeline.Part, numOfBlocks int, size uint64) {
276275
numOfBlocks = f.totalNumberOfBlocks
277276
size = f.totalSize
278277
allPartitions := make([][]pipeline.PartsPartition, len(f.filesInfo))

sources/ostorefactory.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,16 @@ import (
77
"github.com/Azure/blobporter/pipeline"
88
)
99

10-
//AzureBlob constructs parts channel and implements data readers for Azure Blobs exposed via HTTP
11-
type AzureBlob struct {
12-
HTTPPipeline
10+
//AzureBlobSource constructs parts channel and implements data readers for Azure Blobs exposed via HTTP
11+
type AzureBlobSource struct {
12+
HTTPSource
1313
Container string
1414
BlobNames []string
1515
exactNameMatch bool
1616
}
1717

18-
//NewAzureBlob creates a new instance of the HTTPPipeline for Azure Blobs
19-
func NewAzureBlob(params *AzureBlobParams) []pipeline.SourcePipeline {
18+
//NewAzureBlobSourcePipeline creates a new instance of the HTTPPipeline for Azure Blobs
19+
func NewAzureBlobSourcePipeline(params *AzureBlobParams) []pipeline.SourcePipeline {
2020
var err error
2121
var azObjStorage objectListManager
2222
azObjStorage = newazBlobInfoProvider(params)
@@ -25,10 +25,10 @@ func NewAzureBlob(params *AzureBlobParams) []pipeline.SourcePipeline {
2525
log.Fatal(fmt.Errorf("Invalid operation. The number of files per batch must be greater than zero"))
2626
}
2727

28-
factory := func(httpSource HTTPPipeline) (pipeline.SourcePipeline, error) {
29-
return &AzureBlob{Container: params.Container,
28+
factory := func(httpSource HTTPSource) (pipeline.SourcePipeline, error) {
29+
return &AzureBlobSource{Container: params.Container,
3030
BlobNames: params.BlobNames,
31-
HTTPPipeline: httpSource,
31+
HTTPSource: httpSource,
3232
exactNameMatch: params.SourceParams.UseExactNameMatch}, nil
3333
}
3434

@@ -40,14 +40,14 @@ func NewAzureBlob(params *AzureBlobParams) []pipeline.SourcePipeline {
4040
return pipelines
4141
}
4242

43-
//S3Pipeline S3 source HTTP based pipeline
44-
type S3Pipeline struct {
45-
HTTPPipeline
43+
//S3Source S3 source HTTP based pipeline
44+
type S3Source struct {
45+
HTTPSource
4646
exactNameMatch bool
4747
}
4848

49-
//NewS3Pipeline creates a new instance of the HTTPPipeline for S3
50-
func NewS3Pipeline(params *S3Params) []pipeline.SourcePipeline {
49+
//NewS3SourcePipeline creates a new instance of the HTTPPipeline for S3
50+
func NewS3SourcePipeline(params *S3Params) []pipeline.SourcePipeline {
5151
var err error
5252
var s3ObjStorage objectListManager
5353
s3ObjStorage, err = newS3InfoProvider(params)
@@ -60,9 +60,9 @@ func NewS3Pipeline(params *S3Params) []pipeline.SourcePipeline {
6060
log.Fatal(fmt.Errorf("Invalid operation. The number of files per batch must be greater than zero"))
6161
}
6262

63-
factory := func(httpSource HTTPPipeline) (pipeline.SourcePipeline, error) {
64-
return &S3Pipeline{
65-
HTTPPipeline: httpSource,
63+
factory := func(httpSource HTTPSource) (pipeline.SourcePipeline, error) {
64+
return &S3Source{
65+
HTTPSource: httpSource,
6666
exactNameMatch: params.SourceParams.UseExactNameMatch}, nil
6767
}
6868

targets/azureblock.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@ import (
1414
///// AzureBlock Target
1515
////////////////////////////////////////////////////////////
1616

17-
//AzureBlock represents an Azure Block target
18-
type AzureBlock struct {
17+
//AzureBlockTarget represents an Azure Block target
18+
type AzureBlockTarget struct {
1919
container string
2020
azutil *util.AzUtil
2121
}
2222

23-
//NewAzureBlockPipeline TODO
24-
func NewAzureBlockPipeline(params AzureTargetParams) pipeline.TargetPipeline {
23+
//NewAzureBlockTargetPipeline TODO
24+
func NewAzureBlockTargetPipeline(params AzureTargetParams) pipeline.TargetPipeline {
2525

2626
azutil, err := util.NewAzUtil(params.AccountName, params.AccountKey, params.Container, params.BaseBlobURL)
2727

@@ -40,12 +40,12 @@ func NewAzureBlockPipeline(params AzureTargetParams) pipeline.TargetPipeline {
4040
log.Fatal(err)
4141
}
4242

43-
return &AzureBlock{azutil: azutil, container: params.Container}
43+
return &AzureBlockTarget{azutil: azutil, container: params.Container}
4444
}
4545

4646
//CommitList implements CommitList from the pipeline.TargetPipeline interface.
4747
//Commits the list of blocks to Azure Storage to finalize the transfer.
48-
func (t *AzureBlock) CommitList(listInfo *pipeline.TargetCommittedListInfo, numberOfBlocks int, targetName string) (msg string, err error) {
48+
func (t *AzureBlockTarget) CommitList(listInfo *pipeline.TargetCommittedListInfo, numberOfBlocks int, targetName string) (msg string, err error) {
4949
startTime := time.Now()
5050

5151
//Only commit if the number blocks is greater than one.
@@ -80,7 +80,7 @@ func convertToBase64EncodedList(list interface{}, numOfBlocks int) []string {
8080

8181
//PreProcessSourceInfo implementation of PreProcessSourceInfo from the pipeline.TargetPipeline interface.
8282
//Checks if uncommitted blocks are present and cleans them by creating an empty blob.
83-
func (t *AzureBlock) PreProcessSourceInfo(source *pipeline.SourceInfo, blockSize uint64) (err error) {
83+
func (t *AzureBlockTarget) PreProcessSourceInfo(source *pipeline.SourceInfo, blockSize uint64) (err error) {
8484
numOfBlocks := int(source.Size+(blockSize-1)) / int(blockSize)
8585

8686
if numOfBlocks > util.MaxBlockCount { // more than 50,000 blocks needed, so can't work
@@ -94,7 +94,7 @@ func (t *AzureBlock) PreProcessSourceInfo(source *pipeline.SourceInfo, blockSize
9494
//ProcessWrittenPart implements ProcessWrittenPart from the pipeline.TargetPipeline interface.
9595
//Appends the written part to a list. If the part is duplicated the list is updated with a reference, to the first occurrence of the block.
9696
//If the first occurrence has not yet being processed, the part is requested to be placed back in the results channel (requeue == true).
97-
func (t *AzureBlock) ProcessWrittenPart(result *pipeline.WorkerResult, listInfo *pipeline.TargetCommittedListInfo) (requeue bool, err error) {
97+
func (t *AzureBlockTarget) ProcessWrittenPart(result *pipeline.WorkerResult, listInfo *pipeline.TargetCommittedListInfo) (requeue bool, err error) {
9898
requeue = false
9999
blockList := convertToBase64EncodedList((*listInfo).List, result.NumberOfBlocks)
100100

@@ -116,7 +116,7 @@ func (t *AzureBlock) ProcessWrittenPart(result *pipeline.WorkerResult, listInfo
116116

117117
//WritePart implements WritePart from the pipeline.TargetPipeline interface.
118118
//Performs a PUT block operation with the data contained in the part.
119-
func (t *AzureBlock) WritePart(part *pipeline.Part) (duration time.Duration, startTime time.Time, numOfRetries int, err error) {
119+
func (t *AzureBlockTarget) WritePart(part *pipeline.Part) (duration time.Duration, startTime time.Time, numOfRetries int, err error) {
120120
startTime = time.Now()
121121
defer func() { duration = time.Now().Sub(startTime) }()
122122
util.PrintfIfDebug("WritePart -> blockid:%v read:%v name:%v err:%v", part.BlockID, len(part.Data), part.TargetAlias, err)

0 commit comments

Comments
 (0)