Skip to content

Commit ca53ba8

Browse files
authored
Merge pull request #61 from Azure/dev
504 release
2 parents 640d1d7 + c44f8d6 commit ca53ba8

File tree

7 files changed

+89
-27
lines changed

7 files changed

+89
-27
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ Without the -n option all files in the container will be downloaded.
135135
136136
## Command Options
137137

138-
- `-f`, `--file` *string* URL, file or files (e.g. /data/*.gz) to upload. Destination file for download scenarios.
138+
- `-f`, `--file` *string* URL, file or files (e.g. /data/*.gz) to upload.
139139

140140
- `-c`, `--container_name` *string* container name (e.g. `mycontainer`).
141141

blobporter.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,13 @@ var storageAccountKey string
4141
var storageClientHTTPTimeout int
4242
var quietMode bool
4343
var calculateMD5 bool
44+
var exactNameMatch bool
4445

4546
const (
4647
// User can use environment variables to specify storage account information
4748
storageAccountNameEnvVar = "ACCOUNT_NAME"
4849
storageAccountKeyEnvVar = "ACCOUNT_KEY"
49-
programVersion = "0.5.03" // version number to show in help
50+
programVersion = "0.5.04" // version number to show in help
5051
)
5152

5253
const numOfWorkersFactor = 9
@@ -68,8 +69,8 @@ func init() {
6869
)
6970

7071
const (
71-
fileMsg = "URL, file or files (e.g. /data/*.gz) to upload.\n\tDestination file for download."
72-
nameMsg = "Blob name for upload or download scenarios from Azure Blob Storage.\n\tDestination file name for downloads from a URL."
72+
fileMsg = "URL, file or files (e.g. /data/*.gz) to upload."
73+
nameMsg = "Blob name (e.g. myblob.txt) or prefix for download scenarios."
7374
containerNameMsg = "Container name (e.g. mycontainer).\n\tIf the container does not exist, it will be created."
7475
concurrentWorkersMsg = "Number of workers for parallel upload."
7576
concurrentReadersMsg = "Number of readers for parallel reading of the input file(s)."
@@ -82,6 +83,7 @@ func init() {
8283
accountKeyMsg = "Storage account key string.\n\tCan also be specified via the " + storageAccountKeyEnvVar + " environment variable."
8384
dupcheckLevelMsg = "Desired level of effort to detect duplicate data to minimize upload size.\n\tMust be one of " + transfer.DupeCheckLevelStr
8485
transferDefMsg = "Defines the type of source and target in the transfer.\n\tMust be one of:\n\tfile-blockblob, file-pageblob, http-blockblob, http-pageblob, blob-file,\n\tpageblock-file (alias of blob-file), blockblob-file (alias of blob-file)\n\tor http-file."
86+
exactNameMatchMsg = "If set or true only blobs that match the name exactly will be downloaded."
8587
)
8688

8789
flag.Usage = func() {
@@ -99,6 +101,7 @@ func init() {
99101
util.PrintUsageDefaults("k", "account_key", "", accountKeyMsg)
100102
util.PrintUsageDefaults("d", "dup_check_level", dedupeLevelOptStr, dupcheckLevelMsg)
101103
util.PrintUsageDefaults("t", "transfer_definition", string(defaultTransferDef), transferDefMsg)
104+
util.PrintUsageDefaults("e", "exact_name", "false", exactNameMatchMsg)
102105
}
103106

104107
util.StringListVarAlias(&sourceURIs, "f", "file", "", fileMsg)
@@ -115,7 +118,7 @@ func init() {
115118
util.StringVarAlias(&storageAccountKey, "k", "account_key", "", accountKeyMsg)
116119
util.StringVarAlias(&dedupeLevelOptStr, "d", "dup_check_level", dedupeLevelOptStr, dupcheckLevelMsg)
117120
util.StringVarAlias(&transferDefStr, "t", "transfer_definition", string(defaultTransferDef), transferDefMsg)
118-
121+
util.BoolVarAlias(&exactNameMatch, "e", "exact_name", false, exactNameMatchMsg)
119122
}
120123

121124
var dataTransferred uint64
@@ -236,7 +239,7 @@ func getPipelines() (pipeline.SourcePipeline, pipeline.TargetPipeline) {
236239
blobNames = []string{""}
237240
}
238241

239-
source = sources.NewAzureBlob(containerName, blobNames, storageAccountName, storageAccountKey, calculateMD5)
242+
source = sources.NewAzureBlob(containerName, blobNames, storageAccountName, storageAccountKey, calculateMD5, exactNameMatch)
240243
target = targets.NewMultiFile(true, numberOfWorkers)
241244

242245
case transfer.HTTPToFile:

sources/azureblob.go

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,22 @@ import (
2020

2121
//AzureBlob constructs parts channel and implements data readers for Azure Blobs exposed via HTTP
2222
type AzureBlob struct {
23-
HTTPSource HTTPPipeline
24-
Container string
25-
BlobNames []string
26-
storageClient storage.BlobStorageClient
23+
HTTPSource HTTPPipeline
24+
Container string
25+
BlobNames []string
26+
exactNameMatch bool
27+
storageClient storage.BlobStorageClient
2728
}
2829

2930
//NewAzureBlob creates a new instance of HTTPPipeline
3031
//Creates a SASURI to the blobName with an expiration of sasTokenNumberOfHours.
3132
//blobName is used as the target alias.
32-
func NewAzureBlob(container string, blobNames []string, accountName string, accountKey string, md5 bool) pipeline.SourcePipeline {
33-
azureSource := AzureBlob{Container: container, BlobNames: blobNames, storageClient: util.GetBlobStorageClient(accountName, accountKey)}
33+
func NewAzureBlob(container string, blobNames []string, accountName string, accountKey string, md5 bool, useExactNameMatch bool) pipeline.SourcePipeline {
34+
azureSource := AzureBlob{Container: container,
35+
BlobNames: blobNames,
36+
storageClient: util.GetBlobStorageClient(accountName, accountKey),
37+
exactNameMatch: useExactNameMatch}
38+
3439
if sourceURIs, err := azureSource.getSourceURIs(); err == nil {
3540
httpSource := NewHTTP(sourceURIs, nil, md5)
3641
azureSource.HTTPSource = httpSource.(HTTPPipeline)
@@ -73,17 +78,29 @@ func (f AzureBlob) getSourceURIs() ([]string, error) {
7378
var sourceURI string
7479
for _, blobList := range blobLists {
7580
for _, blob := range blobList.Blobs {
76-
sourceURI, err = f.storageClient.GetBlobSASURI(f.Container, blob.Name, date, "r")
7781

78-
if err != nil {
79-
return nil, err
82+
include := true
83+
if f.exactNameMatch {
84+
include = blob.Name == blobList.Prefix
85+
}
86+
87+
if include {
88+
sourceURI, err = f.storageClient.GetBlobSASURI(f.Container, blob.Name, date, "r")
89+
90+
if err != nil {
91+
return nil, err
92+
}
93+
sourceURIs = append(sourceURIs, sourceURI)
8094
}
81-
sourceURIs = append(sourceURIs, sourceURI)
8295
}
8396
}
8497

8598
if len(sourceURIs) == 0 {
86-
return nil, fmt.Errorf("the prefix %v (-n option) did not match any blob names", f.BlobNames)
99+
nameMatchMode := "prefix"
100+
if f.exactNameMatch {
101+
nameMatchMode = "name"
102+
}
103+
return nil, fmt.Errorf("the %v %s did not match any blob names", nameMatchMode, f.BlobNames)
87104
}
88105

89106
return sourceURIs, nil
@@ -100,12 +117,29 @@ func (f AzureBlob) getBlobLists() ([]storage.BlobListResponse, error) {
100117
listOfLists := make([]storage.BlobListResponse, listLength)
101118

102119
for i, blobname := range f.BlobNames {
120+
var list *storage.BlobListResponse
103121
params := storage.ListBlobsParameters{Prefix: blobname}
104-
var list storage.BlobListResponse
105-
if list, err = f.storageClient.ListBlobs(f.Container, params); err != nil {
106-
return nil, err
122+
123+
for {
124+
var listpage storage.BlobListResponse
125+
if listpage, err = f.storageClient.ListBlobs(f.Container, params); err != nil {
126+
return nil, err
127+
}
128+
129+
if list == nil {
130+
list = &listpage
131+
} else {
132+
(*list).Blobs = append((*list).Blobs, listpage.Blobs...)
133+
}
134+
135+
if listpage.NextMarker == "" {
136+
break
137+
}
138+
139+
params.Marker = listpage.NextMarker
107140
}
108-
listOfLists[i] = list
141+
142+
listOfLists[i] = *list
109143
}
110144

111145
return listOfLists, nil

targets/azureblock.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ func convertToStorageBlockList(list interface{}, numOfBlocks int) []storage.Bloc
7070
}
7171

7272
//PreProcessSourceInfo implementation of PreProcessSourceInfo from the pipeline.TargetPipeline interface.
73-
//Passthrough no need to pre-process for blob blocks.
73+
//Checks if uncommitted blocks are present and cleans them by creating an empty blob.
7474
func (t AzureBlock) PreProcessSourceInfo(source *pipeline.SourceInfo) (err error) {
75-
return nil
75+
return util.CleanUncommittedBlocks(&t.StorageClient, t.Container, source.TargetAlias)
7676
}
7777

7878
//ProcessWrittenPart implements ProcessWrittenPart from the pipeline.TargetPipeline interface.

targets/azurepage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func NewAzurePage(accountName string, accountKey string, container string) pipel
3232
const PageSize int64 = 512
3333

3434
//PreProcessSourceInfo implementation of PreProcessSourceInfo from the pipeline.TargetPipeline interface.
35-
//Passthrough no need to pre-process for blob blocks.
35+
//initializes the page blob.
3636
func (t AzurePage) PreProcessSourceInfo(source *pipeline.SourceInfo) (err error) {
3737
size := int64(source.Size)
3838

transfer/transfer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ func TestFileToBlobToFile(t *testing.T) {
187187
tfer.WaitForCompletion()
188188

189189
ap = targets.NewMultiFile(true, numOfWorkers)
190-
fp = sources.NewAzureBlob(container, []string{sourceFile}, accountName, accountKey, true)
190+
fp = sources.NewAzureBlob(container, []string{sourceFile}, accountName, accountKey, true, false)
191191
tfer = NewTransfer(&fp, &ap, numOfReaders, numOfWorkers, blockSize)
192192
tfer.StartTransfer(None, delegate)
193193
tfer.WaitForCompletion()
@@ -207,7 +207,7 @@ func TestFileToBlobToFileWithAlias(t *testing.T) {
207207
tfer.WaitForCompletion()
208208

209209
ap = targets.NewMultiFile(true, numOfWorkers)
210-
fp = sources.NewAzureBlob(container, []string{alias}, accountName, accountKey, true)
210+
fp = sources.NewAzureBlob(container, []string{alias}, accountName, accountKey, true, false)
211211
tfer = NewTransfer(&fp, &ap, numOfReaders, numOfWorkers, blockSize)
212212
tfer.StartTransfer(None, delegate)
213213
tfer.WaitForCompletion()

util/util.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ func BoolVarAlias(varPtr *bool, shortflag string, longflag string, defaultVal bo
171171
// Retriable execution of a function
172172
///////////////////////////////////////////////////////////////////
173173

174-
const retryLimit = 50 // max retries for an operation in retriableOperation
174+
const retryLimit = 100 // max retries for an operation in retriableOperation
175175
const retrySleepDuration = time.Millisecond * 200 // Retry wait interval in retriableOperation
176176

177177
//RetriableOperation executes a function, retrying up to "retryLimit" times and waiting "retrySleepDuration" between attempts
@@ -244,6 +244,31 @@ func CreateContainerIfNotExists(container string, accountName string, accountKey
244244

245245
}
246246

247+
//CleanUncommittedBlocks clears uncommitted blocks if an existing blob does not exists.
248+
func CleanUncommittedBlocks(client *storage.BlobStorageClient, container string, blobName string) error {
249+
list, _ := client.GetBlockList(container, blobName, "Uncommitted")
250+
251+
if len(list.UncommittedBlocks) == 0 {
252+
return nil
253+
}
254+
255+
fmt.Printf("Warning! uncommitted blocks detected for blob %v \nAttempting to clean them up\n", blobName)
256+
257+
exists, err := client.BlobExists(container, blobName)
258+
259+
if exists {
260+
fmt.Printf("Can't delete uncommitted blocks for the blob:%v. A committed blob with the same name already exists \n", blobName)
261+
return nil
262+
}
263+
264+
if err != nil {
265+
return err
266+
}
267+
268+
return client.CreateBlockBlob(container, blobName)
269+
270+
}
271+
247272
//GetBlobStorageClient gets a storage client with support for larg block blobs
248273
func GetBlobStorageClient(accountName string, accountKey string) storage.BlobStorageClient {
249274
var bc storage.BlobStorageClient

0 commit comments

Comments
 (0)