Skip to content

Commit e032b5c

Browse files
committed
v0522
1 parent 549d9d4 commit e032b5c

File tree

7 files changed

+184
-58
lines changed

7 files changed

+184
-58
lines changed

LICENSE

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,11 @@ Unless required by applicable law or agreed to in writing, software distributed
3131
See the License for specific language governing permissions and limitations under the License.
3232
--------------------------------------------------------End of License---------------------------------------------------------------------------------------
3333

34+
Minio Client SDK for Go
35+
Licensed under the Apache License, Version 2.0 (the "License");
36+
you may not use this file except in compliance with the License. You may obtain a copy of the License at
37+
http://www.apache.org/licenses/LICENSE-2.0
38+
Unless required by applicable law or agreed to in writing, software
39+
distributed under the License is distributed on an "AS IS" BASIS,
40+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
41+
See the License for the specific language governing permissions and limitations under the License.

README.md

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ Sources and targets are decoupled, this design enables the composition of variou
1515
| --------------- | -----------------| -----------------|-----------------|
1616
| File (Upload) | Yes | Yes | NA |
1717
| HTTP/HTTPS* | Yes | Yes | Yes |
18-
| Azure Block Blob | Yes** | Yes** | Yes |
19-
| Azure Page Blob | Yes** | Yes** | Yes |
18+
| Azure Block Blob | Yes | Yes | Yes |
19+
| Azure Page Blob | Yes | Yes | Yes |
20+
| S3 Endpoint | Yes | Yes | No |
2021

2122
*\* The HTTP/HTTPS source must support HTTP byte ranges and return the file size as a response to a HTTP HEAD request.*
2223

23-
*\*\* Using the Blob URL with a valid SAS Token with read access .*
2424

2525
## Getting Started
2626

@@ -29,7 +29,7 @@ Sources and targets are decoupled, this design enables the composition of variou
2929
Download, extract and set permissions:
3030

3131
```bash
32-
wget -O bp_linux.tar.gz https://github.com/Azure/blobporter/releases/download/v0.5.14/bp_linux.tar.gz
32+
wget -O bp_linux.tar.gz https://github.com/Azure/blobporter/releases/download/v0.5.22/bp_linux.tar.gz
3333
tar -xvf bp_linux.tar.gz linux_amd64/blobporter
3434
chmod +x ~/linux_amd64/blobporter
3535
cd ~/linux_amd64
@@ -46,7 +46,7 @@ export ACCOUNT_KEY=<STORAGE_ACCOUNT_KEY>
4646
4747
### Windows
4848

49-
Download [BlobPorter.exe](https://github.com/Azure/blobporter/releases/download/v0.5.14/bp_windows.zip)
49+
Download [BlobPorter.exe](https://github.com/Azure/blobporter/releases/download/v0.5.22/bp_windows.zip)
5050

5151
Set environment variables (if using the command prompt):
5252

@@ -94,6 +94,25 @@ For example, a single file upload to page blob:
9494

9595
>Note: The file size and block size must be a multiple of 512 (bytes). The maximum block size is 4MB.
9696
97+
### Upload from an S3 compatible endpoint to Azure Blob Storage
98+
99+
You can upload data directly from a S3 compatible endpoint. First you must specify the access and secret keys via environment variables.
100+
101+
```bash
102+
export S3_ACCESS_KEY=<YOUR ACCESS KEY>
103+
export S3_SECRET_KEY=<YOUR_SECRET_KEY>
104+
```
105+
106+
Then you can specify a S3 URI, with the following format:
107+
108+
[URL]/[BUCKET][PREFIX]
109+
110+
For example:
111+
112+
`./blobporter -f s3://s3.amazonaws.com/bpperftest/mydata -c froms3 -t s3-blockblob -p`
113+
114+
Note: It is recommended to perform this operation from a VM running on the cloud. This is a network bound operation where is uploaded as it is received from the source.
115+
97116
### Upload from an HTTP/HTTPS source to Azure Blob Storage
98117

99118
To block blob storage:
@@ -104,13 +123,22 @@ To page blob storage:
104123

105124
`./blobporter -f "http://mysource/my.vhd" -c mycontainer -n my.vhd -t http-pageblob`
106125

107-
You can use this approach to transfer data between Azure Storage accounts and blob types - e.g. transfer a blob from one account to another or from a page blob to block blob.
108126

109-
The source is a page blob with a SAS token and the target is block blob:
127+
### Synchronously Copy data in Azure Blob Storage
128+
129+
You can synchronously transfer data between Azure Storage accounts, containers and blob types.
130+
131+
First, you must set the account key of the source storage account.
132+
133+
```bash
134+
export SOURCE_ACCOUNT_KEY=<YOUR KEY>
135+
```
136+
137+
Then you can specify the URI of the source. Prefixes supported.
110138

111-
`./blobporter -f "https://myaccount.blob.core.windows.net/vhds/my.vhd?st=2015-03-23T03%3A59%3A00Z&se=2015-03-24T03%3A59%3A00Z&sp=rl&sv=2015-12-11&sr=b&sig=123rfdAsYyqOxxOGe28%3Fp4H6r5reR8pdSBdlchi64wgs3D" -c mycontainer -n my.vhd -t http-blockblob`
139+
`./blobporter -f "https://mysourceaccount.blob.core.windows.net/container/myblob" -c mycontainer -t blob-blockblob`
112140

113-
>Note: In HTTP/HTTPS to blob transfers, data is downloaded and uploaded as it is received without disk IO.
141+
>Note: It is recommended to perform this operation from a VM running in the same region as source or the target. As with all HTTP based transfers, data is uploaded as it is downloaded from the source, therefore the transfer is primarily network bound.
114142
115143
### Download from Azure Blob Storage
116144

@@ -139,7 +167,7 @@ By default files are downloaded to the same directory where you are running blob
139167
140168
## Command Options
141169

142-
- `-f`, `--source_file` *string* URL, file or files (e.g. /data/*.gz) to upload.
170+
- `-f`, `--source_file` *string* URL, Azure Blob or S3 Endpoint, file or files (e.g. /data/*.gz) to upload.
143171

144172
- `-c`, `--container_name` *string* container name (e.g. `mycontainer`).
145173

@@ -167,7 +195,7 @@ By default files are downloaded to the same directory where you are running blob
167195

168196
- `-d`, `--dup_check_level` *string* desired level of effort to detect duplicate data blocks to minimize upload size. Must be one of None, ZeroOnly, Full (default "None")
169197

170-
- `-t`, `--transfer_type` *string* defines the source and target of the transfer. Must be one of file-blockblob, file-pageblob, http-blockblob, http-pageblob, blob-file, pageblock-file (alias of blob-file), blockblob-file (alias of blob-file) or http-file
198+
- `-t`, `--transfer_type` *string* defines the source and target of the transfer. Must be one of file-blockblob, file-pageblob, http-blockblob, http-pageblob, blob-file, pageblock-file (alias of blob-file), blockblob-file (alias of blob-file), http-file, blob-pageblob, blob-blockblob, s3-pageblob and s3-blockblob.
171199

172200
- `m`, `--compute_blockmd5` *bool* if present or true, block level MD5 has will be computed and included as a header when the block is sent to blob storage. Default is false.
173201

@@ -189,7 +217,7 @@ By default, BlobPorter creates 5 readers and 8 workers for each core on the comp
189217

190218
- For transfers from fast disks (SSD) or HTTP sources reducing the number readers or workers could provide better performance than the default values. Reduce these values if you want to minimize resource utilization. Lowering these numbers reduces contention and the likelihood of experiencing throttling conditions.
191219

192-
- Transfers can be batched. Each batch transfer will concurrently read and transfer up to 200 files (default value) from the source. The batch size can be modified using the -x option, the maximum value is 500.
220+
- Transfers can be batched. Each batch transfer will concurrently read and transfer up to 500 files (default value) from the source. The batch size can be modified using the -x option.
193221

194222
- Blobs smaller than the block size are transferred in a single operation. With relatively small files (<32MB) performance may be better if you set a block size equal to the size of the files. Setting the number of workers and readers to the number of files could yield performance gains.
195223

blobporter.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,12 @@ const (
5454
s3AccessKeyEnvVar = "S3_ACCESS_KEY"
5555
s3SecretKeyEnvVar = "S3_SECRET_KEY"
5656

57-
programVersion = "0.5.21" // version number to show in help
57+
programVersion = "0.5.22" // version number to show in help
5858
)
5959

6060
const numOfWorkersFactor = 8
6161
const numOfReadersFactor = 5
62-
const defaultNumberOfFilesInBatch = 200
62+
const defaultNumberOfFilesInBatch = 500
6363
const defaultNumberOfHandlesPerFile = 2
6464

6565
func init() {

inttest._m.sh

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,13 @@ dd if=/dev/urandom of=$F1 bs=36M count=1 iflag=fullblock
100100

101101
#Download file
102102
./blobporter -n $DOWN_F1 -c $CONT -t blob-file -b 32MB
103-
calculateMD5 $F1 $DOWN_F1
103+
calculateMD5 $F1 $DOWN_F1
104+
105+
106+
#Scenario 9 - Synchronous copy from one container to another in the same storage account.
107+
CONT2="syncopy"
108+
SRC_URL="https://"$ACCOUNT_NAME".blob.core.windows.net/"$CONT
109+
SRC_ACCOUNT_KEY=$ACCOUNT_KEY
110+
111+
./blobporter -f $SRC_URL -c $CONT2 -t blob-blockblob
112+

sources/http.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package sources
22

33
import (
4-
"io"
4+
"io/ioutil"
55
"log"
66
"strconv"
77
"strings"
@@ -201,12 +201,17 @@ func (f *HTTPPipeline) ExecuteReader(partitionsQ chan pipeline.PartsPartition, p
201201
err = fmt.Errorf("Invalid status code in the response. Status: %v Bytes: %v", status, header)
202202
}
203203

204+
if res != nil && res.Body != nil {
205+
res.Body.Close()
206+
}
207+
f.HTTPClient = util.NewHTTPClient()
208+
204209
util.PrintfIfDebug("ExecuteReader -> |%v|%v|%v|%v|%v", p.BlockID, p.BytesToRead, status, err, header)
205210

206211
return err
207212
}
208-
//p.Data, err = ioutil.ReadAll(res.Body)
209-
_, err = io.ReadFull(res.Body, p.Data[:p.BytesToRead])
213+
p.Data, err = ioutil.ReadAll(res.Body)
214+
//_, err = io.ReadFull(res.Body, p.Data[:p.BytesToRead])
210215

211216
res.Body.Close()
212217
if err != nil {

targets/multifile.go

Lines changed: 97 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/Azure/blobporter/pipeline"
12+
"github.com/Azure/blobporter/util"
1213
)
1314

1415
////////////////////////////////////////////////////////////
@@ -18,7 +19,7 @@ import (
1819
//MultiFile represents an OS file(s) target
1920
type MultiFile struct {
2021
Container string
21-
FileHandles map[string]chan *os.File
22+
FileHandles sync.Map
2223
NumberOfHandles int
2324
OverWrite bool
2425
sync.Mutex
@@ -27,51 +28,43 @@ type MultiFile struct {
2728
//NewMultiFile creates a new multi file target and 'n' number of handles for concurrent writes to a file.
2829
func NewMultiFile(overwrite bool, numberOfHandles int) pipeline.TargetPipeline {
2930

30-
return &MultiFile{FileHandles: make(map[string]chan *os.File), NumberOfHandles: numberOfHandles, OverWrite: overwrite}
31+
//return &MultiFile{FileHandles: make(map[string]chan *os.File), NumberOfHandles: numberOfHandles, OverWrite: overwrite}
32+
return &MultiFile{NumberOfHandles: numberOfHandles, OverWrite: overwrite}
3133
}
3234

3335
//PreProcessSourceInfo implementation of PreProcessSourceInfo from the pipeline.TargetPipeline interface.
3436
//Passthrough no need to pre-process for a file target.
3537
func (t *MultiFile) PreProcessSourceInfo(source *pipeline.SourceInfo, blockSize uint64) (err error) {
36-
t.Lock()
37-
defer t.Unlock()
38+
return nil
39+
}
3840

41+
func (t *MultiFile) createFileIfNotExists(targetAlias string) error {
3942
var fh *os.File
43+
var err error
4044

41-
path := filepath.Dir(source.TargetAlias)
45+
path := filepath.Dir(targetAlias)
4246

4347
if path != "" {
4448
err = os.MkdirAll(path, 0777)
4549

4650
if err != nil {
47-
return
51+
return err
4852
}
4953
}
50-
51-
if fh, err = os.Create(source.TargetAlias); os.IsExist(err) {
54+
defer fh.Close()
55+
if fh, err = os.Create(targetAlias); os.IsExist(err) {
5256
if !t.OverWrite {
5357
return fmt.Errorf("The file already exists and file overwrite is disabled")
5458
}
55-
if err = os.Remove(source.TargetAlias); err != nil {
59+
if err = os.Remove(targetAlias); err != nil {
5660
return err
5761
}
5862

59-
if fh, err = os.Create(source.TargetAlias); err != nil {
63+
if fh, err = os.Create(targetAlias); err != nil {
6064
return err
6165
}
62-
}
63-
64-
fhQ := make(chan *os.File, t.NumberOfHandles)
65-
66-
for i := 0; i < t.NumberOfHandles; i++ {
6766

68-
if fh, err = os.OpenFile(source.TargetAlias, os.O_WRONLY, os.ModeAppend); err != nil {
69-
log.Fatal(err)
70-
}
71-
72-
fhQ <- fh
7367
}
74-
t.FileHandles[source.TargetAlias] = fhQ
7568

7669
return nil
7770
}
@@ -80,10 +73,18 @@ func (t *MultiFile) PreProcessSourceInfo(source *pipeline.SourceInfo, blockSize
8073
//For a file download a final commit is not required and this implementation closes all the filehandles.
8174
func (t *MultiFile) CommitList(listInfo *pipeline.TargetCommittedListInfo, numberOfBlocks int, targetName string) (msg string, err error) {
8275

83-
close(t.FileHandles[targetName])
76+
value, ok := t.FileHandles.Load(targetName)
77+
util.PrintfIfDebug("CommitList -> targetname:%v listinfo:%+v\n", targetName, *listInfo)
78+
79+
if !ok {
80+
return "", nil
81+
}
82+
83+
fhq := value.(chan *os.File)
84+
close(fhq)
8485
for {
8586

86-
fh, ok := <-t.FileHandles[targetName]
87+
fh, ok := <-fhq
8788

8889
if !ok {
8990
break
@@ -104,21 +105,90 @@ func (t *MultiFile) CommitList(listInfo *pipeline.TargetCommittedListInfo, numbe
104105
//ProcessWrittenPart implements ProcessWrittenPart from the pipeline.TargetPipeline interface.
105106
//Passthrough implementation as no post-written-processing is required (e.g. maintain a list) when files are downloaded.
106107
func (t *MultiFile) ProcessWrittenPart(result *pipeline.WorkerResult, listInfo *pipeline.TargetCommittedListInfo) (requeue bool, err error) {
108+
//fmt.Printf("WrittenPart->%+v \n", *result)
107109
return false, nil
108110
}
109111

112+
func (t *MultiFile) loadHandle(part *pipeline.Part) (*os.File, error) {
113+
var fh *os.File
114+
var fhQ chan *os.File
115+
var err error
116+
117+
//if this is a small file open the handle but not leave open
118+
if part.NumberOfBlocks == 1 {
119+
if err = t.createFileIfNotExists(part.TargetAlias); err != nil {
120+
return nil, err
121+
}
122+
if fh, err = os.OpenFile(part.TargetAlias, os.O_WRONLY, os.ModeAppend); err != nil {
123+
return nil, err
124+
}
125+
return fh, nil
126+
}
127+
128+
t.Lock()
129+
value, _ := t.FileHandles.Load(part.TargetAlias)
130+
if value == nil {
131+
fhQ = make(chan *os.File, t.NumberOfHandles)
132+
if err = t.createFileIfNotExists(part.TargetAlias); err != nil {
133+
return nil, err
134+
}
135+
for i := 0; i < t.NumberOfHandles; i++ {
136+
if fh, err = os.OpenFile(part.TargetAlias, os.O_WRONLY, os.ModeAppend); err != nil {
137+
return nil, err
138+
}
139+
fhQ <- fh
140+
}
141+
t.FileHandles.Store(part.TargetAlias, fhQ)
142+
143+
} else {
144+
fhQ = value.(chan *os.File)
145+
}
146+
t.Unlock()
147+
148+
fh = <-fhQ
149+
return fh, nil
150+
}
151+
152+
func (t *MultiFile) closeOrKeepHandle(part *pipeline.Part, fh *os.File) error {
153+
if part.NumberOfBlocks == 1 {
154+
return fh.Close()
155+
}
156+
157+
t.Lock()
158+
defer t.Unlock()
159+
value, ok := t.FileHandles.Load(part.TargetAlias)
160+
161+
if ok {
162+
fhq := value.(chan *os.File)
163+
fhq <- fh
164+
t.FileHandles.Store(part.TargetAlias, fhq)
165+
return nil
166+
}
167+
168+
return fmt.Errorf("File handle channel not found in the map")
169+
170+
}
171+
110172
//WritePart implements WritePart from the pipeline.TargetPipeline interface.
111173
//Writes to a local file using a filehandle received from a channel.
112174
func (t *MultiFile) WritePart(part *pipeline.Part) (duration time.Duration, startTime time.Time, numOfRetries int, err error) {
113175
startTime = time.Now()
176+
var fh *os.File
114177

115-
fh := <-t.FileHandles[part.TargetAlias]
116-
if _, err := fh.WriteAt((*part).Data, int64((*part).Offset)); err != nil {
178+
if fh, err = t.loadHandle(part); err != nil {
117179
log.Fatal(err)
118180
}
119-
duration = time.Now().Sub(startTime)
120181

121-
t.FileHandles[part.TargetAlias] <- fh
182+
if _, err = fh.WriteAt((*part).Data, int64((*part).Offset)); err != nil {
183+
log.Fatal(err)
184+
}
185+
186+
//fmt.Printf("Part Written:%+v\n", part.Ordinal)
187+
188+
if err = t.closeOrKeepHandle(part, fh); err != nil {
189+
log.Fatal(err)
190+
}
191+
duration = time.Now().Sub(startTime)
122192

123193
return
124194
}

0 commit comments

Comments
 (0)