Skip to content

Commit 5e525cd

Browse files
committed
- handleman for target pipeline
- keep dir structure is now true by default - remove -p, and added -i option - debug mode for handleman
1 parent 8bd15c4 commit 5e525cd

File tree

9 files changed

+250
-165
lines changed

9 files changed

+250
-165
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ Sources and targets are decoupled, this design enables the composition of variou
2929
Download, extract and set permissions:
3030

3131
```bash
32-
wget -O bp_linux.tar.gz https://github.com/Azure/blobporter/releases/download/v0.5.25/bp_linux.tar.gz
32+
wget -O bp_linux.tar.gz https://github.com/Azure/blobporter/releases/download/v0.6.03/bp_linux.tar.gz
3333
tar -xvf bp_linux.tar.gz linux_amd64/blobporter
3434
chmod +x ~/linux_amd64/blobporter
3535
cd ~/linux_amd64
@@ -46,7 +46,7 @@ export ACCOUNT_KEY=<STORAGE_ACCOUNT_KEY>
4646
4747
### Windows
4848

49-
Download [BlobPorter.exe](https://github.com/Azure/blobporter/releases/download/v0.5.25/bp_windows.zip)
49+
Download [BlobPorter.exe](https://github.com/Azure/blobporter/releases/download/v0.6.03/bp_windows.zip)
5050

5151
Set environment variables (if using the command prompt):
5252

args.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ type arguments struct {
6565
quietMode bool
6666
calculateMD5 bool
6767
exactNameMatch bool
68-
keepDirStructure bool
68+
removeDirStructure bool
6969
hTTPClientTimeout int
7070
numberOfHandlesPerFile int //numberOfHandlesPerFile = defaultNumberOfHandlesPerFile
7171
numberOfFilesInBatch int //numberOfFilesInBatch = defaultNumberOfFilesInBatch
@@ -121,15 +121,15 @@ func newParamParserValidator() paramParserValidator {
121121
var defaultNumberOfWorkers = runtime.NumCPU() * numOfWorkersFactor
122122
var defaultNumberOfReaders = runtime.NumCPU() * numOfReadersFactor
123123

124-
if defaultNumberOfWorkers > 50 {
125-
defaultNumberOfWorkers = 50
124+
if defaultNumberOfWorkers > 60 {
125+
defaultNumberOfWorkers = 60
126126
}
127-
if defaultNumberOfReaders > 80 {
128-
defaultNumberOfReaders = 80
127+
if defaultNumberOfReaders > 50 {
128+
defaultNumberOfReaders = 50
129129
}
130130

131131
args := &arguments{
132-
keepDirStructure: true,
132+
removeDirStructure: false,
133133
numberOfReaders: defaultNumberOfReaders,
134134
numberOfWorkers: defaultNumberOfWorkers,
135135
blockSizeStr: defaultBlockSizeStr,
@@ -192,7 +192,8 @@ func (p *paramParserValidator) getTargetRules() ([]parseAndValidationRule, error
192192
switch p.targetSegment {
193193
case transfer.File:
194194
return []parseAndValidationRule{
195-
p.pvNumOfHandlesPerFile}, nil
195+
p.pvNumOfHandlesPerFile,
196+
p.pvKeepDirStructueIsFalseWarning}, nil
196197
case transfer.PageBlob:
197198
return []parseAndValidationRule{
198199
p.pvTargetContainerIsReq,
@@ -249,7 +250,7 @@ func (p *paramParserValidator) getSourceRules() ([]parseAndValidationRule, error
249250

250251
//Global rules....
251252
func (p *paramParserValidator) pvgKeepDirectoryStructure() error {
252-
p.params.keepDirStructure = p.args.keepDirStructure
253+
p.params.keepDirStructure = !p.args.removeDirStructure
253254
return nil
254255
}
255256
func (p *paramParserValidator) pvgQuietMode() error {
@@ -544,6 +545,14 @@ func (p *paramParserValidator) pvTargetBlobAuthInfoIsReq() error {
544545
return nil
545546
}
546547

548+
func (p *paramParserValidator) pvKeepDirStructueIsFalseWarning() error {
549+
if !p.params.keepDirStructure && !p.params.quietMode {
550+
fmt.Printf("Warning!\nThe directory structure from the source won't be created.\nFiles with the same file name will be overwritten.\n")
551+
}
552+
553+
return nil
554+
}
555+
547556
func (p *paramParserValidator) runParseAndValidationRules(rules ...parseAndValidationRule) error {
548557
for i := 0; i < len(rules); i++ {
549558
val := rules[i]

blobporter.go

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,13 @@ import (
99
"os"
1010
"strconv"
1111
"sync/atomic"
12-
"time"
1312

1413
"github.com/Azure/blobporter/pipeline"
1514
"github.com/Azure/blobporter/transfer"
1615
"github.com/Azure/blobporter/util"
1716
)
1817

19-
const programVersion = "0.6.03"
18+
const programVersion = "0.6.05"
2019

2120
var argsUtil paramParserValidator
2221

@@ -46,7 +45,7 @@ func init() {
4645
dupcheckLevelMsg = "Desired level of effort to detect duplicate data to minimize upload size.\n\tMust be one of " + transfer.DupeCheckLevelStr
4746
transferDefMsg = "Defines the type of source and target in the transfer.\n\tMust be one of:\n\tfile-blockblob, file-pageblob, http-blockblob, http-pageblob, blob-file,\n\tpageblock-file (alias of blob-file), blockblob-file (alias of blob-file)\n\tor http-file."
4847
exactNameMatchMsg = "If set or true only blobs that match the name exactly will be downloaded."
49-
keepDirStructureMsg = "Keeps the directory structure from the source.\n\tNot applicable when the source is a HTTP endpoint."
48+
removeDirStructureMsg = "If set the directory structure from the source won't be kept.\n\tNot applicable when the source is a HTTP endpoint."
5049
numberOfHandlersPerFileMsg = "Number of open handles for concurrent reads and writes per file."
5150
numberOfFilesInBatchMsg = "Maximum number of files in a transfer.\n\tIf the number is exceeded new transfers are created"
5251
)
@@ -67,7 +66,7 @@ func init() {
6766
util.PrintUsageDefaults("d", "dup_check_level", argsUtil.args.dedupeLevelOptStr, dupcheckLevelMsg)
6867
util.PrintUsageDefaults("t", "transfer_definition", argsUtil.args.transferDefStr, transferDefMsg)
6968
util.PrintUsageDefaults("e", "exact_name", "false", exactNameMatchMsg)
70-
util.PrintUsageDefaults("p", "keep_directories", "true", keepDirStructureMsg)
69+
util.PrintUsageDefaults("i", "remove_directories", "false", removeDirStructureMsg)
7170
util.PrintUsageDefaults("h", "handles_per_file", strconv.Itoa(argsUtil.args.numberOfHandlesPerFile), numberOfHandlersPerFileMsg)
7271
util.PrintUsageDefaults("x", "files_per_transfer", strconv.Itoa(argsUtil.args.numberOfFilesInBatch), numberOfFilesInBatchMsg)
7372

@@ -88,7 +87,7 @@ func init() {
8887
util.StringVarAlias(&argsUtil.args.dedupeLevelOptStr, "d", "dup_check_level", argsUtil.args.dedupeLevelOptStr, dupcheckLevelMsg)
8988
util.StringVarAlias(&argsUtil.args.transferDefStr, "t", "transfer_definition", argsUtil.args.transferDefStr, transferDefMsg)
9089
util.BoolVarAlias(&argsUtil.args.exactNameMatch, "e", "exact_name", false, exactNameMatchMsg)
91-
util.BoolVarAlias(&argsUtil.args.keepDirStructure, "p", "keep_directories", true, keepDirStructureMsg)
90+
util.BoolVarAlias(&argsUtil.args.removeDirStructure, "i", "remove_directories", false, removeDirStructureMsg)
9291
util.IntVarAlias(&argsUtil.args.numberOfHandlesPerFile, "h", "handles_per_file", defaultNumberOfHandlesPerFile, numberOfHandlersPerFileMsg)
9392
util.IntVarAlias(&argsUtil.args.numberOfFilesInBatch, "x", "files_per_transfer", defaultNumberOfFilesInBatch, numberOfFilesInBatchMsg)
9493
}
@@ -98,7 +97,7 @@ var targetRetries int32
9897

9998
func displayFilesToTransfer(sourcesInfo []pipeline.SourceInfo, numOfBatches int, batchNumber int) {
10099
if numOfBatches == 1 {
101-
fmt.Printf("Files to Transfer (%v) :\n ", argsUtil.params.transferType)
100+
fmt.Printf("Files to Transfer (%v) :\n", argsUtil.params.transferType)
102101
var totalSize uint64
103102
summary := ""
104103

@@ -112,7 +111,7 @@ func displayFilesToTransfer(sourcesInfo []pipeline.SourceInfo, numOfBatches int,
112111
totalSize = totalSize + source.Size
113112
}
114113

115-
if len(sourcesInfo) < 50 {
114+
if len(sourcesInfo) < 20 {
116115
fmt.Printf(summary)
117116
return
118117
}
@@ -125,18 +124,6 @@ func displayFilesToTransfer(sourcesInfo []pipeline.SourceInfo, numOfBatches int,
125124
fmt.Printf("\nBatch transfer (%v).\nFiles per Batch: %v.\nBatch: %v of %v\n ", argsUtil.params.transferType, len(sourcesInfo), batchNumber+1, numOfBatches)
126125
}
127126

128-
func displayFinalWrapUpSummary(duration time.Duration, targetRetries int32, threadTarget int, totalNumberOfBlocks int, totalSize uint64, cumWriteDuration time.Duration) {
129-
var netMB float64 = 1000000
130-
fmt.Printf("\nThe transfer took %v to run.\n", duration)
131-
MBs := float64(totalSize) / netMB / duration.Seconds()
132-
fmt.Printf("Throughput: %1.2f MB/s (%1.2f Mb/s) \n", MBs, MBs*8)
133-
fmt.Printf("Configuration: R=%d, W=%d, DataSize=%s, Blocks=%d\n",
134-
argsUtil.params.numberOfReaders, argsUtil.params.numberOfWorkers, util.PrintSize(totalSize), totalNumberOfBlocks)
135-
fmt.Printf("Cumulative Writes Duration: Total=%v, Avg Per Worker=%v\n",
136-
cumWriteDuration, time.Duration(cumWriteDuration.Nanoseconds()/int64(argsUtil.params.numberOfWorkers)))
137-
fmt.Printf("Retries: Avg=%v Total=%v\n", float32(targetRetries)/float32(totalNumberOfBlocks), targetRetries)
138-
}
139-
140127
func main() {
141128

142129
flag.Parse()

files.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#! /bin/bash
2+
for n in {51..200}; do
3+
dd if=/dev/urandom of=file$( printf %03d "$n" ).bin bs=1 count=$(( 50000 * 1024 )) &
4+
done

sources/http.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package sources
33
import (
44
"io/ioutil"
55
"log"
6+
"net"
67
"strconv"
78
"strings"
89
"sync"
10+
"time"
911

1012
"fmt"
1113
"net/http"
@@ -55,7 +57,7 @@ func newHTTPSource(sourceListManager objectListManager, pipelineFactory sourceHT
5557
numOfFilesInBatch = filesSent
5658
}
5759

58-
httpSource := HTTPPipeline{Sources: sourceInfos[start : start+numOfFilesInBatch], HTTPClient: util.NewHTTPClient(), includeMD5: includeMD5}
60+
httpSource := HTTPPipeline{Sources: sourceInfos[start : start+numOfFilesInBatch], HTTPClient: httpSourceHTTPClient, includeMD5: includeMD5}
5961

6062
pipelines[b], err = pipelineFactory(httpSource)
6163

@@ -92,7 +94,7 @@ func NewHTTP(sourceURIs []string, targetAliases []string, md5 bool) pipeline.Sou
9294
TargetAlias: targetAlias,
9395
SourceName: sourceURIs[i]}
9496
}
95-
return &HTTPPipeline{Sources: sources, HTTPClient: util.NewHTTPClient(), includeMD5: md5}
97+
return &HTTPPipeline{Sources: sources, HTTPClient: httpSourceHTTPClient, includeMD5: md5}
9698
}
9799

98100
func getSourceSize(sourceURI string) (size int) {
@@ -207,7 +209,6 @@ func (f *HTTPPipeline) ExecuteReader(partitionsQ chan pipeline.PartsPartition, p
207209
if res != nil && res.Body != nil {
208210
res.Body.Close()
209211
}
210-
f.HTTPClient = util.NewHTTPClient()
211212

212213
util.PrintfIfDebug("ExecuteReader -> blockid:%v toread:%v status:%v err:%v head:%v", p.BlockID, p.BytesToRead, status, err, header)
213214

@@ -267,3 +268,29 @@ func (f *HTTPPipeline) ConstructBlockInfoQueue(blockSize uint64) (partitionsQ ch
267268

268269
return
269270
}
271+
272+
const (
273+
maxIdleConns = 50
274+
maxIdleConnsPerHost = 50
275+
)
276+
277+
var httpSourceHTTPClient = newSourceHTTPClient()
278+
279+
func newSourceHTTPClient() *http.Client {
280+
return &http.Client{
281+
Timeout: time.Duration(util.HTTPClientTimeout) * time.Second,
282+
Transport: &http.Transport{
283+
Dial: (&net.Dialer{
284+
Timeout: 30 * time.Second, // dial timeout
285+
KeepAlive: 30 * time.Second,
286+
DualStack: true,
287+
}).Dial,
288+
MaxIdleConns: maxIdleConns,
289+
MaxIdleConnsPerHost: maxIdleConnsPerHost,
290+
IdleConnTimeout: 90 * time.Second,
291+
TLSHandshakeTimeout: 30 * time.Second,
292+
ExpectContinueTimeout: 1 * time.Second,
293+
DisableKeepAlives: false,
294+
DisableCompression: false,
295+
MaxResponseHeaderBytes: 0}}
296+
}

0 commit comments

Comments
 (0)