Skip to content

Commit 6377ffb

Browse files
authored
Merge pull request #68 from Azure/dev
v0.5.12
2 parents 1690d12 + 6cc7e56 commit 6377ffb

File tree

9 files changed

+85
-37
lines changed

9 files changed

+85
-37
lines changed

.vscode/launch.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@
1212
"host": "127.0.0.1",
1313
"program": "${fileDirname}",
1414
"env": {
15-
"ACCOUNT_NAME":"storagejaa",
16-
"ACCOUNT_KEY":"/mXWG4aXMWvftzR+Ed5URccwrDSvvv5cUsKCqq5gbyPXvtlyfQ9gjq462kuH/2dErjwR8QcAS74vLaNXynd74g=="
15+
"ACCOUNT_NAME":"bprelease",
16+
"ACCOUNT_KEY":"bGgXq0WvdkGRKe38Icww4i+3NUsb4E0ngm9tImN/liiB9uix/Wwgua45x+uP6A6EQCv9QIAyk73mlcn2bJ+vkQ=="
1717
},
18-
"args": ["-c", "many", "-t","blob-file","-p"] ,
18+
"args": ["-c","100k","-t","blob-file","-p","-x","300"] ,
1919
"showLog": true
2020
}
2121
]

README.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ Sources and targets are decoupled, this design enables the composition of variou
2929
Download, extract and set permissions:
3030

3131
```bash
32-
wget -O bp_linux.tar.gz https://github.com/Azure/blobporter/releases/download/v0.5.10/bp_linux.tar.gz
32+
wget -O bp_linux.tar.gz https://github.com/Azure/blobporter/releases/download/v0.5.12/bp_linux.tar.gz
3333
tar -xvf bp_linux.tar.gz linux_amd64/blobporter
3434
chmod +x ~/linux_amd64/blobporter
3535
cd ~/linux_amd64
@@ -46,7 +46,7 @@ export ACCOUNT_KEY=<STORAGE_ACCOUNT_KEY>
4646
4747
### Windows
4848

49-
Download [BlobPorter.exe](https://github.com/Azure/blobporter/releases/download/v0.5.10/bp_windows.zip)
49+
Download [BlobPorter.exe](https://github.com/Azure/blobporter/releases/download/v0.5.12/bp_windows.zip)
5050

5151
Set environment variables (if using the command prompt):
5252

@@ -175,9 +175,11 @@ By default files are downloaded to the same directory where you are running blob
175175

176176
- `q`, `--quiet_mode` *bool* if present or true, the progress indicator is not displayed. The files to transfer, errors, warnings and transfer completion summary is still displayed.
177177

178-
- `x`, `---files_per_transfer` *int* number of files in a batch transfer. Default is 200.
178+
- `x`, `--files_per_transfer` *int* number of files in a batch transfer. Default is 200.
179179

180-
- `h`, `----handles_per_file` *int* number of open handles for concurrent reads and writes per file. Default is 2.
180+
- `h`, `--handles_per_file` *int* number of open handles for concurrent reads and writes per file. Default is 2.
181+
182+
- `p`, `--keep_directories` *bool* if set blobs are downloaded or uploaded keeping the directory structure from the source. Not applicable when the source is a HTTP endpoint.
181183

182184
## Performance Considerations
183185

@@ -189,11 +191,9 @@ By default, BlobPorter creates 5 readers and 8 workers for each core on the comp
189191

190192
- For transfers from fast disks (SSD) or HTTP sources reducing the number readers or workers could provide better performance than the default values. Reduce these values if you want to minimize resource utilization. Lowering these numbers reduces contention and the likelihood of experiencing throttling conditions.
191193

192-
- Starting with version 0.5.10:
193-
194-
- - Transfers are batched. Each batch transfer will concurrently read and transfer up to 200 files (default value) from the source. The batch size can be modified using the -x option, the maximum value is 500.
194+
- Transfers can be batched. Each batch transfer will concurrently read and transfer up to 200 files (default value) from the source. The batch size can be modified using the -x option, the maximum value is 500.
195195

196-
- - Blobs smaller than the block size are transferred in a single operation. With relatively small files (<32MB) performance may be better if you set a block size equal to the size of the files. Setting the number of workers and readers to the number of files could yield performance gains.
196+
- Blobs smaller than the block size are transferred in a single operation. With relatively small files (<32MB) performance may be better if you set a block size equal to the size of the files. Setting the number of workers and readers to the number of files could yield performance gains.
197197

198198
## Issues and Feedback
199199

blobporter.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ const (
4949
// User can use environment variables to specify storage account information
5050
storageAccountNameEnvVar = "ACCOUNT_NAME"
5151
storageAccountKeyEnvVar = "ACCOUNT_KEY"
52-
programVersion = "0.5.10" // version number to show in help
52+
programVersion = "0.5.12" // version number to show in help
5353
)
5454

5555
const numOfWorkersFactor = 8
@@ -87,7 +87,7 @@ func init() {
8787
dupcheckLevelMsg = "Desired level of effort to detect duplicate data to minimize upload size.\n\tMust be one of " + transfer.DupeCheckLevelStr
8888
transferDefMsg = "Defines the type of source and target in the transfer.\n\tMust be one of:\n\tfile-blockblob, file-pageblob, http-blockblob, http-pageblob, blob-file,\n\tpageblock-file (alias of blob-file), blockblob-file (alias of blob-file)\n\tor http-file."
8989
exactNameMatchMsg = "If set or true only blobs that match the name exactly will be downloaded."
90-
keepDirStructureMsg = "If set blobs are downloaded to the same directory structure as in the storage account.\n\tIf the directory structure does not exists it will be created."
90+
keepDirStructureMsg = "If set blobs are downloaded or uploaded keeping the directory structure from the source.\n\tNot applicable when the source is a HTTP endpoint."
9191
numberOfHandlersPerFileMsg = "Number of open handles for concurrent reads and writes per file."
9292
numberOfFilesInBatchMsg = "Maximum number of files in a transfer.\n\tIf the number is exceeded new transfers are created"
9393
)
@@ -257,6 +257,7 @@ func getFileToPagePipelines() (source []pipeline.SourcePipeline, target pipeline
257257
TargetAliases: blobNames,
258258
NumOfPartitions: numberOfReaders,
259259
MD5: calculateMD5,
260+
KeepDirStructure: keepDirStructure,
260261
FilesPerPipeline: numberOfFilesInBatch}
261262

262263
source = sources.NewMultiFile(params)
@@ -275,6 +276,7 @@ func getFileToBlockPipelines() (source []pipeline.SourcePipeline, target pipelin
275276
TargetAliases: blobNames,
276277
NumOfPartitions: numberOfReaders,
277278
MD5: calculateMD5,
279+
KeepDirStructure: keepDirStructure,
278280
FilesPerPipeline: numberOfFilesInBatch}
279281

280282
source = sources.NewMultiFile(params)

sources/http.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -142,14 +142,15 @@ func (f *HTTPPipeline) ExecuteReader(partitionsQ chan pipeline.PartsPartition, p
142142
return // no more blocks of file data to be read
143143
}
144144

145-
if req, err = http.NewRequest("GET", p.SourceURI, nil); err != nil {
146-
log.Fatal(err)
147-
}
145+
util.RetriableOperation(func(r int) error {
146+
if req, err = http.NewRequest("GET", p.SourceURI, nil); err != nil {
147+
log.Fatal(err)
148+
}
148149

149-
header := fmt.Sprintf("bytes=%v-%v", p.Offset, p.Offset-1+uint64(p.BytesToRead))
150-
req.Header.Set("Range", header)
150+
header := fmt.Sprintf("bytes=%v-%v", p.Offset, p.Offset-1+uint64(p.BytesToRead))
151+
req.Header.Set("Range", header)
152+
req.Close = true
151153

152-
util.RetriableOperation(func(r int) error {
153154
if res, err = f.HTTPClient.Do(req); err != nil || res.StatusCode != 206 {
154155
var status int
155156
if res != nil {
@@ -162,7 +163,6 @@ func (f *HTTPPipeline) ExecuteReader(partitionsQ chan pipeline.PartsPartition, p
162163
}
163164
return err
164165
}
165-
166166
p.Data, err = ioutil.ReadAll(res.Body)
167167
res.Body.Close()
168168
if err != nil {

sources/multifile.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type MultiFileParams struct {
4545
NumOfPartitions int
4646
MD5 bool
4747
FilesPerPipeline int
48+
KeepDirStructure bool
4849
}
4950

5051
// NewMultiFile creates a new MultiFilePipeline.
@@ -87,14 +88,19 @@ func NewMultiFile(params *MultiFileParams) []pipeline.SourcePipeline {
8788
if len(params.TargetAliases) == len(files) {
8889
targetAlias = params.TargetAliases[start : start+numOfFilesInBatch]
8990
}
90-
pipelines[b] = newMultiFilePipeline(files[start:start+numOfFilesInBatch], targetAlias, params.BlockSize, params.NumOfPartitions, params.MD5)
91+
pipelines[b] = newMultiFilePipeline(files[start:start+numOfFilesInBatch],
92+
targetAlias,
93+
params.BlockSize,
94+
params.NumOfPartitions,
95+
params.MD5,
96+
params.KeepDirStructure)
9197
filesSent = filesSent - numOfFilesInBatch
9298
}
9399

94100
return pipelines
95101
}
96102

97-
func newMultiFilePipeline(files []string, targetAliases []string, blockSize uint64, numOfPartitions int, md5 bool) pipeline.SourcePipeline {
103+
func newMultiFilePipeline(files []string, targetAliases []string, blockSize uint64, numOfPartitions int, md5 bool, keepDirStructure bool) pipeline.SourcePipeline {
98104
totalNumberOfBlocks := 0
99105
var totalSize uint64
100106
var err error
@@ -116,12 +122,16 @@ func newMultiFilePipeline(files []string, targetAliases []string, blockSize uint
116122
totalSize = totalSize + uint64(fileStat.Size())
117123
totalNumberOfBlocks = totalNumberOfBlocks + numOfBlocks
118124

119-
//use the param instead of the original filename only when single file
120-
//transfer occurs.
125+
//use the param instead of the original filename only when
126+
//the number of targets matches the number files to transfer
121127
if useTargetAlias {
122128
sName = targetAliases[f]
123129
} else {
124130
sName = fileStat.Name()
131+
if keepDirStructure {
132+
sName = files[f]
133+
}
134+
125135
}
126136

127137
fileInfo := FileInfo{FileStats: &fileStat, SourceURI: files[f], TargetAlias: sName, NumOfBlocks: numOfBlocks}

targets/azureblock.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ func (t *AzureBlock) CommitList(listInfo *pipeline.TargetCommittedListInfo, numb
5858
//if the max retries is exceeded, panic will happen, hence no error is returned.
5959
duration, _, _ := util.RetriableOperation(func(r int) error {
6060
if err := t.StorageClient.PutBlockList(t.Container, targetName, blockList); err != nil {
61+
t.resetClient()
6162
return err
6263
}
6364
return nil
@@ -126,9 +127,8 @@ func (t *AzureBlock) WritePart(part *pipeline.Part) (duration time.Duration, sta
126127
uint64(len(part.Data)),
127128
bytes.NewReader(part.Data),
128129
headers); err != nil {
129-
if util.Verbose {
130-
fmt.Printf("EH|S|%v|%v|%v|%v\n", (*part).BlockID, len((*part).Data), (*part).TargetAlias, err)
131-
}
130+
util.PrintfIfDebug("Error|S|%v|%v|%v|%v", (*part).BlockID, len((*part).Data), (*part).TargetAlias, err)
131+
t.resetClient()
132132
return err
133133
}
134134
return nil
@@ -140,16 +140,17 @@ func (t *AzureBlock) WritePart(part *pipeline.Part) (duration time.Duration, sta
140140
uint64(len(part.Data)),
141141
part.NewBuffer(),
142142
headers); err != nil {
143-
if util.Verbose {
144-
fmt.Printf("EH|S|%v|%v|%v|%v\n", (*part).BlockID, len((*part).Data), (*part).TargetAlias, err)
145-
}
143+
util.PrintfIfDebug("Error|S|%v|%v|%v|%v", (*part).BlockID, len((*part).Data), (*part).TargetAlias, err)
144+
t.resetClient()
146145
return err
147146
}
148147

149-
if util.Verbose {
150-
fmt.Printf("OKA|S|%v|%v|%v|%v\n", (*part).BlockID, len((*part).Data), (*part).TargetAlias, err)
151-
}
148+
util.PrintfIfDebug("OK|S|%v|%v|%v|%v", (*part).BlockID, len((*part).Data), (*part).TargetAlias, err)
152149
return nil
153150
})
154151
return
155152
}
153+
154+
func (t *AzureBlock) resetClient() {
155+
t.StorageClient = util.GetBlobStorageClientWithNewHTTPClient(t.Creds.AccountName, t.Creds.AccountKey)
156+
}

targets/azurepage.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func (t *AzurePage) PreProcessSourceInfo(source *pipeline.SourceInfo) (err error
4747
//if the max retries is exceeded, panic will happen, hence no error is returned.
4848
util.RetriableOperation(func(r int) error {
4949
if err := (*t.StorageClient).PutPageBlob(t.Container, (*source).TargetAlias, size, nil); err != nil {
50+
t.resetClient()
5051
return err
5152
}
5253
return nil
@@ -90,6 +91,7 @@ func (t *AzurePage) WritePart(part *pipeline.Part) (duration time.Duration, star
9091
if util.Verbose {
9192
fmt.Printf("EH|S|%v|%v|%v|%v\n", part.Offset, len(part.Data), part.TargetAlias, err)
9293
}
94+
t.resetClient()
9395
return err
9496
}
9597

@@ -101,3 +103,8 @@ func (t *AzurePage) WritePart(part *pipeline.Part) (duration time.Duration, star
101103

102104
return
103105
}
106+
107+
func (t *AzurePage) resetClient() {
108+
client := util.GetBlobStorageClientWithNewHTTPClient(t.Creds.AccountName, t.Creds.AccountKey)
109+
t.StorageClient = &client
110+
}

targets/multifile.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ func (t *MultiFile) WritePart(part *pipeline.Part) (duration time.Duration, star
108108
startTime = time.Now()
109109

110110
fh := <-t.FileHandles[part.TargetAlias]
111-
112111
if _, err := fh.WriteAt((*part).Data, int64((*part).Offset)); err != nil {
113112
log.Fatal(err)
114113
}

util/util.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,27 @@ func GetBlobStorageClient(accountName string, accountKey string) storage.BlobSto
290290
return bc
291291
}
292292

293+
//GetBlobStorageClientWithNewHTTPClient gets a storage client with a new instace of the HTTP client.
294+
func GetBlobStorageClientWithNewHTTPClient(accountName string, accountKey string) storage.BlobStorageClient {
295+
var bc storage.BlobStorageClient
296+
var client storage.Client
297+
var err error
298+
299+
if accountName == "" || accountKey == "" {
300+
log.Fatal("Storage account and/or key not specified via options or in environment variables ACCOUNT_NAME and ACCOUNT_KEY")
301+
}
302+
303+
if client, err = storage.NewClient(accountName, accountKey, storage.DefaultBaseURL, LargeBlockAPIVersion, true); err != nil {
304+
log.Fatal(err)
305+
}
306+
307+
client.HTTPClient = NewHTTPClient()
308+
309+
bc = client.GetBlobService()
310+
311+
return bc
312+
}
313+
293314
//GetBlobStorageClientWithSASToken gets a storage client with support for large block blobs
294315
func GetBlobStorageClientWithSASToken(accountName string, sasToken string) storage.BlobStorageClient {
295316
var bc storage.BlobStorageClient
@@ -357,8 +378,8 @@ var storageHTTPClient *http.Client
357378
var HTTPClientTimeout = 600
358379

359380
const (
360-
maxIdleConns = 100
361-
maxIdleConnsPerHost = 100
381+
maxIdleConns = 50
382+
maxIdleConnsPerHost = 50
362383
)
363384

364385
func getStorageHTTPClient() *http.Client {
@@ -392,13 +413,21 @@ func getSuggestion(err error) string {
392413
return "Try using a different container or upload and then delete a small blob with the same name."
393414
case strings.Contains(err.Error(), "Client.Timeout"):
394415
return "Try increasing the timeout using the -s option or reducing the number of workers and readers, options: -r and -g"
395-
case strings.Contains(err.Error(),"too many open files"):
416+
case strings.Contains(err.Error(), "too many open files"):
396417
return "Try increasing the number of open files allowed. For debian systems you can try: ulimit -Sn 2048 "
397418
default:
398419
return ""
399420
}
400421
}
401422

423+
//PrintfIfDebug TODO
424+
func PrintfIfDebug(format string, values ...interface{}) {
425+
if Verbose {
426+
msg := fmt.Sprintf(format, values...)
427+
fmt.Printf("%v\t%s\n", time.Now(), msg)
428+
}
429+
}
430+
402431
//GetFileNameFromURL returns last part of URL (filename)
403432
func GetFileNameFromURL(sourceURI string) (string, error) {
404433

0 commit comments

Comments
 (0)