Skip to content

Commit a7d0459

Browse files
committed
- v0.5.14:
- Fixes: -- An error message is displayed insted of panic when when an invalid URL or protocol is used as source -- Large page blobs (>200GB) won't fail beacuse to the block size check.
2 parents 7d1ca7f + bf7ea3b commit a7d0459

File tree

9 files changed

+37
-67
lines changed

9 files changed

+37
-67
lines changed

.gitignore

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@ _obj
88
_test
99
_build/linux_amd64
1010
_build/windows_amd64
11+
.vscode
1112

1213
# Architecture specific extensions/prefixes
1314
*.[568vq]
1415
[568vq].out
1516
debug
16-
.vscode
17+
launch.json
1718
*.cgo1.go
1819
*.cgo2.c
1920
_cgo_defun.c
@@ -30,4 +31,5 @@ _testmain.go
3031
*.out
3132

3233
# external packages folder
33-
#vendor/
34+
#vendor/
35+
.vscode/launch.json

.vscode/launch.json

Lines changed: 0 additions & 22 deletions
This file was deleted.

blobporter.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ const (
4949
// User can use environment variables to specify storage account information
5050
storageAccountNameEnvVar = "ACCOUNT_NAME"
5151
storageAccountKeyEnvVar = "ACCOUNT_KEY"
52-
programVersion = "0.5.13" // version number to show in help
52+
programVersion = "0.5.14" // version number to show in help
5353
)
5454

5555
const numOfWorkersFactor = 8
@@ -72,7 +72,7 @@ func init() {
7272
)
7373

7474
const (
75-
fileMsg = "URL, file or files (e.g. /data/*.gz) to upload."
75+
fileMsg = "Source URL, file or files (e.g. /data/*.gz) to upload."
7676
nameMsg = "Blob name (e.g. myblob.txt) or prefix for download scenarios."
7777
containerNameMsg = "Container name (e.g. mycontainer).\n\tIf the container does not exist, it will be created."
7878
concurrentWorkersMsg = "Number of workers for parallel upload."
@@ -93,7 +93,7 @@ func init() {
9393
)
9494

9595
flag.Usage = func() {
96-
util.PrintUsageDefaults("f", "file", "", fileMsg)
96+
util.PrintUsageDefaults("f", "source_file", "", fileMsg)
9797
util.PrintUsageDefaults("n", "name", "", nameMsg)
9898
util.PrintUsageDefaults("c", "container_name", "", containerNameMsg)
9999
util.PrintUsageDefaults("g", "concurrent_workers", strconv.Itoa(defaultNumberOfWorkers), concurrentWorkersMsg)
@@ -114,7 +114,7 @@ func init() {
114114

115115
}
116116

117-
util.StringListVarAlias(&sourceURIs, "f", "file", "", fileMsg)
117+
util.StringListVarAlias(&sourceURIs, "f", "source_file", "", fileMsg)
118118
util.StringListVarAlias(&blobNames, "n", "name", "", nameMsg)
119119
util.StringVarAlias(&containerName, "c", "container_name", "", containerNameMsg)
120120
util.IntVarAlias(&numberOfWorkers, "g", "concurrent_workers", defaultNumberOfWorkers, concurrentWorkersMsg)

pipeline/pipeline.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"crypto/md5"
55
"encoding/base64"
66
"fmt"
7-
"log"
87
"sync"
98
"time"
109

@@ -48,7 +47,7 @@ type SourceInfo struct {
4847

4948
//TargetPipeline operations that abstract how parts a written and processed to a given target
5049
type TargetPipeline interface {
51-
PreProcessSourceInfo(source *SourceInfo) (err error)
50+
PreProcessSourceInfo(source *SourceInfo, blockSize uint64) (err error)
5251
CommitList(listInfo *TargetCommittedListInfo, numberOfBlocks int, targetName string) (msg string, err error)
5352
WritePart(part *Part) (duration time.Duration, startTime time.Time, numOfRetries int, err error)
5453
ProcessWrittenPart(result *WorkerResult, listInfo *TargetCommittedListInfo) (requeue bool, err error)
@@ -154,11 +153,6 @@ func ConstructPartsPartition(numberOfPartitions int, size int64, blockSize int64
154153
//bsib := uint64(blockSize)
155154
numOfBlocks := int((size + blockSize - 1) / blockSize)
156155

157-
if numOfBlocks > util.MaxBlockCount { // more than 50,000 blocks needed, so can't work
158-
var minBlkSize = (size + util.MaxBlockCount - 1) / util.MaxBlockCount
159-
log.Fatalf("Block size is too small, minimum block size for this file would be %d bytes", minBlkSize)
160-
}
161-
162156
Partitions := make([]PartsPartition, numberOfPartitions)
163157
//the size of the partition needs to be a multiple (blockSize * int) to make sure all but the last part/block
164158
//are the same size
@@ -191,11 +185,6 @@ func ConstructPartsQueue(size uint64, blockSize uint64, sourceURI string, target
191185
var bsib = blockSize
192186
numOfBlocks = int((size + (bsib - 1)) / bsib)
193187

194-
if numOfBlocks > util.MaxBlockCount { // more than 50,000 blocks needed, so can't work
195-
var minBlkSize = (size + util.MaxBlockCount - 1) / util.MaxBlockCount
196-
log.Fatalf("Block size is too small, minimum block size for this file would be %d bytes", minBlkSize)
197-
}
198-
199188
parts = make([]Part, numOfBlocks)
200189

201190
var curFileOffset uint64

targets/azureblock.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,14 @@ func convertToStorageBlockList(list interface{}, numOfBlocks int) []storage.Bloc
8181

8282
//PreProcessSourceInfo implementation of PreProcessSourceInfo from the pipeline.TargetPipeline interface.
8383
//Checks if uncommitted blocks are present and cleans them by creating an empty blob.
84-
func (t *AzureBlock) PreProcessSourceInfo(source *pipeline.SourceInfo) (err error) {
84+
func (t *AzureBlock) PreProcessSourceInfo(source *pipeline.SourceInfo, blockSize uint64) (err error) {
85+
numOfBlocks := int(source.Size+(blockSize-1)) / int(blockSize)
86+
87+
if numOfBlocks > util.MaxBlockCount { // more than 50,000 blocks needed, so can't work
88+
var minBlkSize = (source.Size + util.MaxBlockCount - 1) / util.MaxBlockCount
89+
return fmt.Errorf("Block size is too small, minimum block size for this file would be %d bytes", minBlkSize)
90+
}
91+
8592
return util.CleanUncommittedBlocks(&t.StorageClient, t.Container, source.TargetAlias)
8693
}
8794

targets/azurepage.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,28 @@ func NewAzurePage(accountName string, accountKey string, container string) pipel
2828
return &AzurePage{Creds: &creds, Container: container, StorageClient: &client}
2929
}
3030

31-
//PageSize size of page in Azure Page Blob storage
32-
const PageSize int64 = 512
31+
//Page blobs limits and units
32+
33+
//PageSize page size for page blobs
34+
const PageSize uint64 = 512
35+
const maxPageSize uint64 = 4 * util.MB
36+
const maxPageBlobSize uint64 = 8 * util.TB
3337

3438
//PreProcessSourceInfo implementation of PreProcessSourceInfo from the pipeline.TargetPipeline interface.
3539
//initializes the page blob.
36-
func (t *AzurePage) PreProcessSourceInfo(source *pipeline.SourceInfo) (err error) {
40+
func (t *AzurePage) PreProcessSourceInfo(source *pipeline.SourceInfo, blockSize uint64) (err error) {
3741
size := int64(source.Size)
3842

39-
if size%PageSize != 0 {
40-
return fmt.Errorf("Invalid size for a page blob. The size of the file %v (%v) is not a multiple of %v", source.SourceName, source.Size, PageSize)
43+
if size%int64(PageSize) != 0 {
44+
return fmt.Errorf(" invalid size for a page blob. The size of the file %v (%v) is not a multiple of %v ", source.SourceName, source.Size, PageSize)
45+
}
46+
47+
if size > int64(maxPageBlobSize) {
48+
return fmt.Errorf(" the file %v is too big (%v). Tha maximum size of a page blob is %v ", source.SourceName, source.Size, maxPageBlobSize)
4149
}
4250

43-
if size > int64(util.TB) {
44-
return fmt.Errorf("The file %v is too big (%v). Tha maximum size of a page blob is %v ", source.SourceName, source.Size, util.TB)
51+
if blockSize > maxPageSize || blockSize < PageSize {
52+
return fmt.Errorf(" invalid block size for page blob: %v. The value must be greater than %v and less than %v", PageSize, maxPageSize)
4553
}
4654

4755
//if the max retries is exceeded, panic will happen, hence no error is returned.

targets/multifile.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func NewMultiFile(overwrite bool, numberOfHandles int) pipeline.TargetPipeline {
3232

3333
//PreProcessSourceInfo implementation of PreProcessSourceInfo from the pipeline.TargetPipeline interface.
3434
//Passthrough no need to pre-process for a file target.
35-
func (t *MultiFile) PreProcessSourceInfo(source *pipeline.SourceInfo) (err error) {
35+
func (t *MultiFile) PreProcessSourceInfo(source *pipeline.SourceInfo, blockSize uint64) (err error) {
3636
t.Lock()
3737
defer t.Unlock()
3838

transfer/transfer.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -323,18 +323,18 @@ func (t *Transfer) StartTransfer(dupeLevel DupeCheckLevel, progressBarDelegate P
323323
go t.processAndCommitResults(t.ControlChannels.Results, progressBarDelegate, t.TargetPipeline, &t.SyncWaitGroups.Commits)
324324
}
325325

326-
//Sequentially calls the PreProcessSourceInfo implementation of the target pipeline for each source in the transfer.
326+
//Concurrely calls the PreProcessSourceInfo implementation of the target pipeline for each source in the transfer.
327327
func (t *Transfer) preprocessSources() {
328328
sourcesInfo := (*t.SourcePipeline).GetSourcesInfo()
329329
var wg sync.WaitGroup
330330
wg.Add(len(sourcesInfo))
331331
for i := 0; i < len(sourcesInfo); i++ {
332-
go func(s *pipeline.SourceInfo) {
332+
go func(s *pipeline.SourceInfo, b uint64) {
333333
defer wg.Done()
334-
if err := (*t.TargetPipeline).PreProcessSourceInfo(s); err != nil {
334+
if err := (*t.TargetPipeline).PreProcessSourceInfo(s, b); err != nil {
335335
log.Fatal(err)
336336
}
337-
}(&sourcesInfo[i])
337+
}(&sourcesInfo[i], t.blockSize)
338338
}
339339

340340
wg.Wait()

util/util.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -206,20 +206,6 @@ func RetriableOperation(operation func(r int) error) (duration time.Duration, st
206206
}
207207
}
208208

209-
///////////////////////////////////////////////////////////////////
210-
211-
//GetNumberOfBlocks calculates the number of blocks from filesize and checks if the number is greater than what's allowed (MaxBlockCount).
212-
func GetNumberOfBlocks(size uint64, blockSize uint64) int {
213-
numOfBlocks := int(size+(blockSize-1)) / int(blockSize)
214-
215-
if numOfBlocks > MaxBlockCount { // more than 50,000 blocks needed, so can't work
216-
var minBlkSize = (size + MaxBlockCount - 1) / MaxBlockCount
217-
log.Fatalf("Block size is too small, minimum block size for this file would be %d bytes", minBlkSize)
218-
}
219-
220-
return numOfBlocks
221-
}
222-
223209
//CreateContainerIfNotExists Creates a new container if doesn't exists. Validates the name of the container.
224210
func CreateContainerIfNotExists(container string, accountName string, accountKey string) {
225211

0 commit comments

Comments
 (0)