Skip to content

Commit 6d086ab

Browse files
committed
- client resets
1 parent 523db8c commit 6d086ab

File tree

4 files changed

+50
-13
lines changed

4 files changed

+50
-13
lines changed

blobporter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ const (
4949
// User can use environment variables to specify storage account information
5050
storageAccountNameEnvVar = "ACCOUNT_NAME"
5151
storageAccountKeyEnvVar = "ACCOUNT_KEY"
52-
programVersion = "0.5.10" // version number to show in help
52+
programVersion = "0.5.11a" // version number to show in help
5353
)
5454

5555
const numOfWorkersFactor = 8

targets/azureblock.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ func (t *AzureBlock) CommitList(listInfo *pipeline.TargetCommittedListInfo, numb
5858
//if the max retries is exceeded, panic will happen, hence no error is returned.
5959
duration, _, _ := util.RetriableOperation(func(r int) error {
6060
if err := t.StorageClient.PutBlockList(t.Container, targetName, blockList); err != nil {
61+
t.resetClient()
6162
return err
6263
}
6364
return nil
@@ -126,9 +127,8 @@ func (t *AzureBlock) WritePart(part *pipeline.Part) (duration time.Duration, sta
126127
uint64(len(part.Data)),
127128
bytes.NewReader(part.Data),
128129
headers); err != nil {
129-
if util.Verbose {
130-
fmt.Printf("EH|S|%v|%v|%v|%v\n", (*part).BlockID, len((*part).Data), (*part).TargetAlias, err)
131-
}
130+
util.PrintfIfDebug("Error|S|%v|%v|%v|%v", (*part).BlockID, len((*part).Data), (*part).TargetAlias, err)
131+
t.resetClient()
132132
return err
133133
}
134134
return nil
@@ -140,16 +140,17 @@ func (t *AzureBlock) WritePart(part *pipeline.Part) (duration time.Duration, sta
140140
uint64(len(part.Data)),
141141
part.NewBuffer(),
142142
headers); err != nil {
143-
if util.Verbose {
144-
fmt.Printf("EH|S|%v|%v|%v|%v\n", (*part).BlockID, len((*part).Data), (*part).TargetAlias, err)
145-
}
143+
util.PrintfIfDebug("Error|S|%v|%v|%v|%v", (*part).BlockID, len((*part).Data), (*part).TargetAlias, err)
144+
t.resetClient()
146145
return err
147146
}
148147

149-
if util.Verbose {
150-
fmt.Printf("OKA|S|%v|%v|%v|%v\n", (*part).BlockID, len((*part).Data), (*part).TargetAlias, err)
151-
}
148+
util.PrintfIfDebug("OK|S|%v|%v|%v|%v", (*part).BlockID, len((*part).Data), (*part).TargetAlias, err)
152149
return nil
153150
})
154151
return
155152
}
153+
154+
func (t *AzureBlock) resetClient() {
155+
t.StorageClient = util.GetBlobStorageClientWithNewHTTPClient(t.Creds.AccountName, t.Creds.AccountKey)
156+
}

targets/azurepage.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func (t *AzurePage) PreProcessSourceInfo(source *pipeline.SourceInfo) (err error
4747
//if the max retries is exceeded, panic will happen, hence no error is returned.
4848
util.RetriableOperation(func(r int) error {
4949
if err := (*t.StorageClient).PutPageBlob(t.Container, (*source).TargetAlias, size, nil); err != nil {
50+
t.resetClient()
5051
return err
5152
}
5253
return nil
@@ -90,6 +91,7 @@ func (t *AzurePage) WritePart(part *pipeline.Part) (duration time.Duration, star
9091
if util.Verbose {
9192
fmt.Printf("EH|S|%v|%v|%v|%v\n", part.Offset, len(part.Data), part.TargetAlias, err)
9293
}
94+
t.resetClient()
9395
return err
9496
}
9597

@@ -101,3 +103,8 @@ func (t *AzurePage) WritePart(part *pipeline.Part) (duration time.Duration, star
101103

102104
return
103105
}
106+
107+
func (t *AzurePage) resetClient() {
108+
client := util.GetBlobStorageClientWithNewHTTPClient(t.Creds.AccountName, t.Creds.AccountKey)
109+
t.StorageClient = &client
110+
}

util/util.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,27 @@ func GetBlobStorageClient(accountName string, accountKey string) storage.BlobSto
290290
return bc
291291
}
292292

293+
//GetBlobStorageClientWithNewHTTPClient gets a storage client with a new instace of the HTTP client.
294+
func GetBlobStorageClientWithNewHTTPClient(accountName string, accountKey string) storage.BlobStorageClient {
295+
var bc storage.BlobStorageClient
296+
var client storage.Client
297+
var err error
298+
299+
if accountName == "" || accountKey == "" {
300+
log.Fatal("Storage account and/or key not specified via options or in environment variables ACCOUNT_NAME and ACCOUNT_KEY")
301+
}
302+
303+
if client, err = storage.NewClient(accountName, accountKey, storage.DefaultBaseURL, LargeBlockAPIVersion, true); err != nil {
304+
log.Fatal(err)
305+
}
306+
307+
client.HTTPClient = NewHTTPClient()
308+
309+
bc = client.GetBlobService()
310+
311+
return bc
312+
}
313+
293314
//GetBlobStorageClientWithSASToken gets a storage client with support for large block blobs
294315
func GetBlobStorageClientWithSASToken(accountName string, sasToken string) storage.BlobStorageClient {
295316
var bc storage.BlobStorageClient
@@ -357,8 +378,8 @@ var storageHTTPClient *http.Client
357378
var HTTPClientTimeout = 600
358379

359380
const (
360-
maxIdleConns = 100
361-
maxIdleConnsPerHost = 100
381+
maxIdleConns = 50
382+
maxIdleConnsPerHost = 50
362383
)
363384

364385
func getStorageHTTPClient() *http.Client {
@@ -392,13 +413,21 @@ func getSuggestion(err error) string {
392413
return "Try using a different container or upload and then delete a small blob with the same name."
393414
case strings.Contains(err.Error(), "Client.Timeout"):
394415
return "Try increasing the timeout using the -s option or reducing the number of workers and readers, options: -r and -g"
395-
case strings.Contains(err.Error(),"too many open files"):
416+
case strings.Contains(err.Error(), "too many open files"):
396417
return "Try increasing the number of open files allowed. For debian systems you can try: ulimit -Sn 2048 "
397418
default:
398419
return ""
399420
}
400421
}
401422

423+
//PrintfIfDebug TODO
424+
func PrintfIfDebug(format string, values ...interface{}) {
425+
if Verbose {
426+
msg := fmt.Sprintf(format, values...)
427+
fmt.Printf("%v\t%s\n", time.Now(), msg)
428+
}
429+
}
430+
402431
//GetFileNameFromURL returns last part of URL (filename)
403432
func GetFileNameFromURL(sourceURI string) (string, error) {
404433

0 commit comments

Comments
 (0)