Skip to content

Commit 62a6136

Browse files
committed
Merge branch 'master' of https://github.com/Azure/blobporter
2 parents 02198af + 328e309 commit 62a6136

File tree

11 files changed

+48
-53
lines changed

11 files changed

+48
-53
lines changed

.gitignore

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@ _obj
88
_test
99
_build/linux_amd64
1010
_build/windows_amd64
11+
.vscode
1112

1213
# Architecture specific extensions/prefixes
1314
*.[568vq]
1415
[568vq].out
1516
debug
16-
.vscode
17+
launch.json
1718
*.cgo1.go
1819
*.cgo2.c
1920
_cgo_defun.c
@@ -30,4 +31,5 @@ _testmain.go
3031
*.out
3132

3233
# external packages folder
33-
#vendor/
34+
#vendor/
35+
.vscode/launch.json

README.md

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ Sources and targets are decoupled, this design enables the composition of variou
2929
Download, extract and set permissions:
3030

3131
```bash
32-
wget -O bp_linux.tar.gz https://github.com/Azure/blobporter/releases/download/v0.5.13/bp_linux.tar.gz
32+
wget -O bp_linux.tar.gz https://github.com/Azure/blobporter/releases/download/v0.5.14/bp_linux.tar.gz
3333
tar -xvf bp_linux.tar.gz linux_amd64/blobporter
3434
chmod +x ~/linux_amd64/blobporter
3535
cd ~/linux_amd64
@@ -46,7 +46,7 @@ export ACCOUNT_KEY=<STORAGE_ACCOUNT_KEY>
4646
4747
### Windows
4848

49-
Download [BlobPorter.exe](https://github.com/Azure/blobporter/releases/download/v0.5.13/bp_windows.zip)
49+
Download [BlobPorter.exe](https://github.com/Azure/blobporter/releases/download/v0.5.14/bp_windows.zip)
5050

5151
Set environment variables (if using the command prompt):
5252

@@ -84,7 +84,6 @@ If you want to rename multiple files, you can use the -n option:
8484

8585
`./blobporter -f /datadrive/f1.tar -f /datadrive/f2.md -n b1 -n b2 -c mycontainer`
8686

87-
8887
### Upload to Azure Page Blob Storage
8988

9089
Same as uploading to block blob storage, but with the transfer definiton (-t option) set to file-pageblob.
@@ -132,7 +131,6 @@ By default files are downloaded to the same directory where you are running blob
132131

133132
`./blobporter -p -c mycontainer -t blob-file`
134133

135-
136134
### Download a file via HTTP to a local file
137135

138136
`./blobporter -f "http://mysource/file.bam" -n /datadrive/file.bam -t http-file`
@@ -141,7 +139,7 @@ By default files are downloaded to the same directory where you are running blob
141139
142140
## Command Options
143141

144-
- `-f`, `--file` *string* URL, file or files (e.g. /data/*.gz) to upload.
142+
- `-f`, `--source_file` *string* URL, file or files (e.g. /data/*.gz) to upload.
145143

146144
- `-c`, `--container_name` *string* container name (e.g. `mycontainer`).
147145

blobporter.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ const (
4949
// User can use environment variables to specify storage account information
5050
storageAccountNameEnvVar = "ACCOUNT_NAME"
5151
storageAccountKeyEnvVar = "ACCOUNT_KEY"
52-
programVersion = "0.5.13" // version number to show in help
52+
programVersion = "0.5.14" // version number to show in help
5353
)
5454

5555
const numOfWorkersFactor = 8
@@ -72,7 +72,7 @@ func init() {
7272
)
7373

7474
const (
75-
fileMsg = "URL, file or files (e.g. /data/*.gz) to upload."
75+
fileMsg = "Source URL, file or files (e.g. /data/*.gz) to upload."
7676
nameMsg = "Blob name (e.g. myblob.txt) or prefix for download scenarios."
7777
containerNameMsg = "Container name (e.g. mycontainer).\n\tIf the container does not exist, it will be created."
7878
concurrentWorkersMsg = "Number of workers for parallel upload."
@@ -93,7 +93,7 @@ func init() {
9393
)
9494

9595
flag.Usage = func() {
96-
util.PrintUsageDefaults("f", "file", "", fileMsg)
96+
util.PrintUsageDefaults("f", "source_file", "", fileMsg)
9797
util.PrintUsageDefaults("n", "name", "", nameMsg)
9898
util.PrintUsageDefaults("c", "container_name", "", containerNameMsg)
9999
util.PrintUsageDefaults("g", "concurrent_workers", strconv.Itoa(defaultNumberOfWorkers), concurrentWorkersMsg)
@@ -114,7 +114,7 @@ func init() {
114114

115115
}
116116

117-
util.StringListVarAlias(&sourceURIs, "f", "file", "", fileMsg)
117+
util.StringListVarAlias(&sourceURIs, "f", "source_file", "", fileMsg)
118118
util.StringListVarAlias(&blobNames, "n", "name", "", nameMsg)
119119
util.StringVarAlias(&containerName, "c", "container_name", "", containerNameMsg)
120120
util.IntVarAlias(&numberOfWorkers, "g", "concurrent_workers", defaultNumberOfWorkers, concurrentWorkersMsg)

pipeline/pipeline.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"crypto/md5"
55
"encoding/base64"
66
"fmt"
7-
"log"
87
"sync"
98
"time"
109

@@ -48,7 +47,7 @@ type SourceInfo struct {
4847

4948
//TargetPipeline operations that abstract how parts a written and processed to a given target
5049
type TargetPipeline interface {
51-
PreProcessSourceInfo(source *SourceInfo) (err error)
50+
PreProcessSourceInfo(source *SourceInfo, blockSize uint64) (err error)
5251
CommitList(listInfo *TargetCommittedListInfo, numberOfBlocks int, targetName string) (msg string, err error)
5352
WritePart(part *Part) (duration time.Duration, startTime time.Time, numOfRetries int, err error)
5453
ProcessWrittenPart(result *WorkerResult, listInfo *TargetCommittedListInfo) (requeue bool, err error)
@@ -154,11 +153,6 @@ func ConstructPartsPartition(numberOfPartitions int, size int64, blockSize int64
154153
//bsib := uint64(blockSize)
155154
numOfBlocks := int((size + blockSize - 1) / blockSize)
156155

157-
if numOfBlocks > util.MaxBlockCount { // more than 50,000 blocks needed, so can't work
158-
var minBlkSize = (size + util.MaxBlockCount - 1) / util.MaxBlockCount
159-
log.Fatalf("Block size is too small, minimum block size for this file would be %d bytes", minBlkSize)
160-
}
161-
162156
Partitions := make([]PartsPartition, numberOfPartitions)
163157
//the size of the partition needs to be a multiple (blockSize * int) to make sure all but the last part/block
164158
//are the same size
@@ -191,11 +185,6 @@ func ConstructPartsQueue(size uint64, blockSize uint64, sourceURI string, target
191185
var bsib = blockSize
192186
numOfBlocks = int((size + (bsib - 1)) / bsib)
193187

194-
if numOfBlocks > util.MaxBlockCount { // more than 50,000 blocks needed, so can't work
195-
var minBlkSize = (size + util.MaxBlockCount - 1) / util.MaxBlockCount
196-
log.Fatalf("Block size is too small, minimum block size for this file would be %d bytes", minBlkSize)
197-
}
198-
199188
parts = make([]Part, numOfBlocks)
200189

201190
var curFileOffset uint64

sources/http.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,13 @@ func getSourceSize(sourceURI string) (size int) {
6767
resp, err := client.Head(sourceURI)
6868

6969
if err != nil || resp.StatusCode != 200 {
70-
err = fmt.Errorf("HEAD request failed. Please check the URL. Status:%d Error: %v", resp.StatusCode, err)
70+
statusCode := ""
71+
if resp != nil {
72+
statusCode = fmt.Sprintf(" Status:%d ", resp.StatusCode)
73+
}
74+
err = fmt.Errorf("HEAD request failed. Please check the URL.%s Error: %v", statusCode, err)
75+
76+
util.PrintfIfDebug("getSourceSize -> %v", err)
7177

7278
size = getSourceSizeFromByteRangeHeader(sourceURI)
7379
return

sources/multifile.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,7 @@ func newMultiFilePipeline(files []string, targetAliases []string, blockSize uint
117117
if fileStat.Size() == 0 {
118118
log.Fatalf("Empty files are not allowed. The file %v is empty", files[f])
119119
}
120-
121-
numOfBlocks := util.GetNumberOfBlocks(uint64(fileStat.Size()), blockSize)
120+
numOfBlocks := int(uint64(fileStat.Size())+(blockSize-1)) / int(blockSize)
122121
totalSize = totalSize + uint64(fileStat.Size())
123122
totalNumberOfBlocks = totalNumberOfBlocks + numOfBlocks
124123

targets/azureblock.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,14 @@ func convertToStorageBlockList(list interface{}, numOfBlocks int) []storage.Bloc
8181

8282
//PreProcessSourceInfo implementation of PreProcessSourceInfo from the pipeline.TargetPipeline interface.
8383
//Checks if uncommitted blocks are present and cleans them by creating an empty blob.
84-
func (t *AzureBlock) PreProcessSourceInfo(source *pipeline.SourceInfo) (err error) {
84+
func (t *AzureBlock) PreProcessSourceInfo(source *pipeline.SourceInfo, blockSize uint64) (err error) {
85+
numOfBlocks := int(source.Size+(blockSize-1)) / int(blockSize)
86+
87+
if numOfBlocks > util.MaxBlockCount { // more than 50,000 blocks needed, so can't work
88+
var minBlkSize = (source.Size + util.MaxBlockCount - 1) / util.MaxBlockCount
89+
return fmt.Errorf("Block size is too small, minimum block size for this file would be %d bytes", minBlkSize)
90+
}
91+
8592
return util.CleanUncommittedBlocks(&t.StorageClient, t.Container, source.TargetAlias)
8693
}
8794

targets/azurepage.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,28 @@ func NewAzurePage(accountName string, accountKey string, container string) pipel
2828
return &AzurePage{Creds: &creds, Container: container, StorageClient: &client}
2929
}
3030

31-
//PageSize size of page in Azure Page Blob storage
32-
const PageSize int64 = 512
31+
//Page blobs limits and units
32+
33+
//PageSize page size for page blobs
34+
const PageSize uint64 = 512
35+
const maxPageSize uint64 = 4 * util.MB
36+
const maxPageBlobSize uint64 = 8 * util.TB
3337

3438
//PreProcessSourceInfo implementation of PreProcessSourceInfo from the pipeline.TargetPipeline interface.
3539
//initializes the page blob.
36-
func (t *AzurePage) PreProcessSourceInfo(source *pipeline.SourceInfo) (err error) {
40+
func (t *AzurePage) PreProcessSourceInfo(source *pipeline.SourceInfo, blockSize uint64) (err error) {
3741
size := int64(source.Size)
3842

39-
if size%PageSize != 0 {
40-
return fmt.Errorf("Invalid size for a page blob. The size of the file %v (%v) is not a multiple of %v", source.SourceName, source.Size, PageSize)
43+
if size%int64(PageSize) != 0 {
44+
return fmt.Errorf(" invalid size for a page blob. The size of the file %v (%v) is not a multiple of %v ", source.SourceName, source.Size, PageSize)
45+
}
46+
47+
if size > int64(maxPageBlobSize) {
48+
return fmt.Errorf(" the file %v is too big (%v). Tha maximum size of a page blob is %v ", source.SourceName, source.Size, maxPageBlobSize)
4149
}
4250

43-
if size > int64(util.TB) {
44-
return fmt.Errorf("The file %v is too big (%v). Tha maximum size of a page blob is %v ", source.SourceName, source.Size, util.TB)
51+
if blockSize > maxPageSize || blockSize < PageSize {
52+
return fmt.Errorf(" invalid block size for page blob: %v. The value must be greater than %v and less than %v", PageSize, maxPageSize)
4553
}
4654

4755
//if the max retries is exceeded, panic will happen, hence no error is returned.

targets/multifile.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func NewMultiFile(overwrite bool, numberOfHandles int) pipeline.TargetPipeline {
3232

3333
//PreProcessSourceInfo implementation of PreProcessSourceInfo from the pipeline.TargetPipeline interface.
3434
//Passthrough no need to pre-process for a file target.
35-
func (t *MultiFile) PreProcessSourceInfo(source *pipeline.SourceInfo) (err error) {
35+
func (t *MultiFile) PreProcessSourceInfo(source *pipeline.SourceInfo, blockSize uint64) (err error) {
3636
t.Lock()
3737
defer t.Unlock()
3838

transfer/transfer.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -323,18 +323,18 @@ func (t *Transfer) StartTransfer(dupeLevel DupeCheckLevel, progressBarDelegate P
323323
go t.processAndCommitResults(t.ControlChannels.Results, progressBarDelegate, t.TargetPipeline, &t.SyncWaitGroups.Commits)
324324
}
325325

326-
//Sequentially calls the PreProcessSourceInfo implementation of the target pipeline for each source in the transfer.
326+
//Concurrely calls the PreProcessSourceInfo implementation of the target pipeline for each source in the transfer.
327327
func (t *Transfer) preprocessSources() {
328328
sourcesInfo := (*t.SourcePipeline).GetSourcesInfo()
329329
var wg sync.WaitGroup
330330
wg.Add(len(sourcesInfo))
331331
for i := 0; i < len(sourcesInfo); i++ {
332-
go func(s *pipeline.SourceInfo) {
332+
go func(s *pipeline.SourceInfo, b uint64) {
333333
defer wg.Done()
334-
if err := (*t.TargetPipeline).PreProcessSourceInfo(s); err != nil {
334+
if err := (*t.TargetPipeline).PreProcessSourceInfo(s, b); err != nil {
335335
log.Fatal(err)
336336
}
337-
}(&sourcesInfo[i])
337+
}(&sourcesInfo[i], t.blockSize)
338338
}
339339

340340
wg.Wait()

util/util.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -206,20 +206,6 @@ func RetriableOperation(operation func(r int) error) (duration time.Duration, st
206206
}
207207
}
208208

209-
///////////////////////////////////////////////////////////////////
210-
211-
//GetNumberOfBlocks calculates the number of blocks from filesize and checks if the number is greater than what's allowed (MaxBlockCount).
212-
func GetNumberOfBlocks(size uint64, blockSize uint64) int {
213-
numOfBlocks := int(size+(blockSize-1)) / int(blockSize)
214-
215-
if numOfBlocks > MaxBlockCount { // more than 50,000 blocks needed, so can't work
216-
var minBlkSize = (size + MaxBlockCount - 1) / MaxBlockCount
217-
log.Fatalf("Block size is too small, minimum block size for this file would be %d bytes", minBlkSize)
218-
}
219-
220-
return numOfBlocks
221-
}
222-
223209
//CreateContainerIfNotExists Creates a new container if doesn't exists. Validates the name of the container.
224210
func CreateContainerIfNotExists(container string, accountName string, accountKey string) {
225211

0 commit comments

Comments
 (0)