Skip to content

Commit 3270568

Browse files
authored
Merge pull request #95 from Azure/dev
v0.6.14
2 parents 64224cc + f8b0d5d commit 3270568

File tree

10 files changed

+290
-41
lines changed

10 files changed

+290
-41
lines changed

blobporter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ func main() {
171171
func getProgressBarDelegate(totalSize uint64, quietMode bool) func(r pipeline.WorkerResult, committedCount int, bufferLevel int) {
172172
dataTransferred = 0
173173
targetRetries = 0
174-
if quietMode {
174+
if quietMode || totalSize == 0 {
175175
return func(r pipeline.WorkerResult, committedCount int, bufferLevel int) {
176176
atomic.AddInt32(&targetRetries, int32(r.Stats.Retries))
177177
dataTransferred = dataTransferred + uint64(r.BlockSize)

docs/gettingstarted.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Download, extract and set permissions
99

1010
::
1111

12-
wget -O bp_linux.tar.gz https://github.com/Azure/blobporter/releases/download/v0.6.12/bp_linux.tar.gz
12+
wget -O bp_linux.tar.gz https://github.com/Azure/blobporter/releases/download/v0.6.14/bp_linux.tar.gz
1313
tar -xvf bp_linux.tar.gz linux_amd64/blobporter
1414
chmod +x ~/linux_amd64/blobporter
1515
cd ~/linux_amd64
@@ -26,7 +26,7 @@ Set environment variables: ::
2626
Windows
2727
-------
2828

29-
Download `BlobPorter.exe <https://github.com/Azure/blobporter/releases/download/v0.6.12/bp_windows.zip>`_
29+
Download `BlobPorter.exe <https://github.com/Azure/blobporter/releases/download/v0.6.14/bp_windows.zip>`_
3030

3131
Set environment variables (if using the command prompt): ::
3232

internal/azutil.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ import (
1111
"os"
1212
"syscall"
1313
"time"
14-
"github.com/Azure/blobporter/util"
14+
1515
"github.com/Azure/azure-pipeline-go/pipeline"
16+
"github.com/Azure/blobporter/util"
1617

1718
"github.com/Azure/azure-storage-blob-go/2016-05-31/azblob"
1819
)
@@ -168,6 +169,12 @@ func (p *AzUtil) PutBlockList(blobName string, blockIDs []string) error {
168169
return nil
169170
}
170171

172+
//PutEmptyBlockBlob TODO
173+
func (p *AzUtil) PutEmptyBlockBlob(blobName string) error {
174+
empty := make([]string, 0)
175+
return p.PutBlockList(blobName, empty)
176+
}
177+
171178
//PutBlock TODO
172179
func (p *AzUtil) PutBlock(container string, blobName string, id string, body io.ReadSeeker) error {
173180
curl := p.serviceURL.NewContainerURL(container)
@@ -190,7 +197,6 @@ func (p *AzUtil) PutBlockBlob(blobName string, body io.ReadSeeker, md5 []byte) e
190197

191198
h := azblob.BlobHTTPHeaders{}
192199

193-
194200
//16 is md5.Size
195201
if len(md5) != 16 {
196202
var md5bytes [16]byte
@@ -406,13 +412,13 @@ func (*retriableError) Temporary() bool {
406412
return true
407413
}
408414

409-
const tcpKeepOpenMinLength = 8 * int64(util.MB)
415+
const tcpKeepOpenMinLength = 8 * int64(util.MB)
410416

411417
func (p *clientPolicy) Do(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
412418
req := request.WithContext(ctx)
413-
419+
414420
if req.ContentLength < tcpKeepOpenMinLength {
415-
req.Close=true
421+
req.Close = true
416422
}
417423

418424
r, err := pipelineHTTPClient.Do(req)

internal/const.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
)
77

88
//ProgramVersion blobporter version
9-
const ProgramVersion = "0.6.12"
9+
const ProgramVersion = "0.6.14"
1010

1111
//HTTPClientTimeout HTTP client timeout when reading from HTTP sources and try timeout for blob storage operations.
1212
var HTTPClientTimeout = 90

pipeline/pipeline.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,25 @@ type PartsPartition struct {
122122
Parts []Part
123123
}
124124

125+
func newEmptyPartition(blockSize int64, sourceURI string, targetAlias string, bufferQ chan []byte) *PartsPartition {
126+
return &PartsPartition{
127+
Offset: 0,
128+
NumOfParts: 1,
129+
TotalNumOfParts: 1,
130+
TotalSize: 0,
131+
Parts: []Part{Part{
132+
Offset: 0,
133+
BlockSize: uint32(blockSize),
134+
BytesToRead: 0,
135+
Ordinal: 0,
136+
SourceURI: sourceURI,
137+
TargetAlias: targetAlias,
138+
NumberOfBlocks: 1,
139+
BufferQ: bufferQ,
140+
}},
141+
}
142+
}
143+
125144
//createPartsInPartition creates the parts in the partition arithmetically.
126145
func createPartsInPartition(partitionSize int64, partitionOffSet int64, ordinalStart int, sourceNumOfBlocks int, blockSize int64, sourceURI string, targetAlias string, bufferQ chan []byte) (parts []Part, ordinal int, numOfPartsInPartition int) {
127146
var bytesLeft = partitionSize
@@ -150,7 +169,23 @@ func createPartsInPartition(partitionSize int64, partitionOffSet int64, ordinalS
150169
}
151170

152171
//ConstructPartsPartition creates a slice of PartsPartition with a len of numberOfPartitions.
153-
func ConstructPartsPartition(numberOfPartitions int, size int64, blockSize int64, sourceURI string, targetAlias string, bufferQ chan []byte) []PartsPartition {
172+
func ConstructPartsPartition(numOfPartitions int, size int64, blockSize int64, sourceURI string, targetAlias string, bufferQ chan []byte) []PartsPartition {
173+
174+
//if an empty file create a empty partition
175+
if size == 0 {
176+
partition := newEmptyPartition(blockSize, sourceURI, targetAlias, bufferQ)
177+
return []PartsPartition{*partition}
178+
}
179+
numberOfPartitions := numOfPartitions
180+
181+
if size < (int64(numberOfPartitions) * blockSize) {
182+
numberOfPartitions = int((size + blockSize - 1) / blockSize)
183+
184+
if numberOfPartitions == 0 {
185+
numberOfPartitions = 1
186+
}
187+
}
188+
154189
numOfBlocks := int((size + blockSize - 1) / blockSize)
155190
Partitions := make([]PartsPartition, numberOfPartitions)
156191
//the size of the partition needs to be a multiple (blockSize * int) to make sure all but the last part/block

pipeline/pipeline_test.go

Lines changed: 103 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,10 @@ package pipeline
33
import (
44
"testing"
55

6-
"os"
7-
86
"github.com/Azure/blobporter/util"
97
"github.com/stretchr/testify/assert"
108
)
119

12-
var accountName = os.Getenv("ACCOUNT_NAME")
13-
var accountKey = os.Getenv("ACCOUNT_KEY")
1410
var blockSize = uint64(4 * util.MiByte)
1511
var numOfReaders = 10
1612
var numOfWorkers = 10
@@ -55,6 +51,109 @@ func TestCreatePartsInPartitionOfSize1(t *testing.T) {
5551
assert.Equal(t, 2, ordinal)
5652
assert.Equal(t, 1, numOfPartsInPartition)
5753
}
54+
func TestConstructPartitionsWhenSizeIsZero(t *testing.T) {
55+
partitionNumber := 10
56+
var size int64
57+
var blockSize int64 = 10000
58+
sourceURI := "S1"
59+
targetAlias := "TA"
60+
partitions := ConstructPartsPartition(partitionNumber, size, blockSize, sourceURI, targetAlias, nil)
61+
62+
assert.Equal(t, len(partitions), 1)
63+
64+
var offSet int64 = 1000
65+
var i int64
66+
var calcSize int64
67+
var calcSize2 int64
68+
j := 0
69+
p := partitions[0]
70+
assert.Equal(t, 1, p.NumOfParts)
71+
assert.Equal(t, offSet*i, p.Offset)
72+
assert.Equal(t, 1, int(p.TotalNumOfParts))
73+
assert.Equal(t, size, p.TotalSize)
74+
assert.Equal(t, 0, int(p.PartitionSize))
75+
assert.Equal(t, 1, len(p.Parts))
76+
calcSize = calcSize + p.PartitionSize
77+
78+
pip := p.Parts[0]
79+
calcSize2 = calcSize2 + int64(pip.BytesToRead)
80+
assert.Equal(t, j, pip.Ordinal)
81+
j++
82+
83+
assert.Equal(t, calcSize, size)
84+
assert.Equal(t, calcSize2, size)
85+
}
86+
func TestConstructPartitionsWhenSizeIsLessAsPartCap(t *testing.T) {
87+
partitionNumber := 10
88+
var size int64 = 10000
89+
var blockSize int64 = 10000
90+
sourceURI := "S1"
91+
targetAlias := "TA"
92+
partitions := ConstructPartsPartition(partitionNumber, size, blockSize, sourceURI, targetAlias, nil)
93+
94+
assert.Equal(t, len(partitions), int(size/blockSize))
95+
96+
var offSet int64 = 1000
97+
var i int64
98+
var calcSize int64
99+
var calcSize2 int64
100+
j := 0
101+
for _, p := range partitions {
102+
assert.Equal(t, 1, p.NumOfParts)
103+
assert.Equal(t, offSet*i, p.Offset)
104+
assert.Equal(t, (size+blockSize-1)/blockSize, p.TotalNumOfParts)
105+
assert.Equal(t, size, p.TotalSize)
106+
assert.Equal(t, size/int64(len(partitions)), p.PartitionSize)
107+
assert.Equal(t, 1, len(p.Parts))
108+
i++
109+
calcSize = calcSize + p.PartitionSize
110+
111+
for _, pip := range p.Parts {
112+
calcSize2 = calcSize2 + int64(pip.BytesToRead)
113+
assert.Equal(t, j, pip.Ordinal)
114+
j++
115+
}
116+
}
117+
118+
assert.Equal(t, calcSize, size)
119+
assert.Equal(t, calcSize2, size)
120+
}
121+
122+
func TestConstructPartitionsWhenSizeIsSameAsPartCap(t *testing.T) {
123+
partitionNumber := 10
124+
var size int64 = 10000
125+
var blockSize int64 = 1000
126+
sourceURI := "S1"
127+
targetAlias := "TA"
128+
partitions := ConstructPartsPartition(partitionNumber, size, blockSize, sourceURI, targetAlias, nil)
129+
130+
assert.Equal(t, partitionNumber, len(partitions))
131+
132+
var offSet int64 = 1000
133+
var i int64
134+
var calcSize int64
135+
var calcSize2 int64
136+
j := 0
137+
for _, p := range partitions {
138+
assert.Equal(t, 1, p.NumOfParts)
139+
assert.Equal(t, offSet*i, p.Offset)
140+
assert.Equal(t, (size+blockSize-1)/blockSize, p.TotalNumOfParts)
141+
assert.Equal(t, size, p.TotalSize)
142+
assert.Equal(t, (size+int64(partitionNumber)-1)/int64(partitionNumber), p.PartitionSize)
143+
assert.Equal(t, ((size+int64(partitionNumber)-1)/int64(partitionNumber))/blockSize, int64(len(p.Parts)))
144+
i++
145+
calcSize = calcSize + p.PartitionSize
146+
147+
for _, pip := range p.Parts {
148+
calcSize2 = calcSize2 + int64(pip.BytesToRead)
149+
assert.Equal(t, j, pip.Ordinal)
150+
j++
151+
}
152+
}
153+
154+
assert.Equal(t, calcSize, size)
155+
assert.Equal(t, calcSize2, size)
156+
}
58157

59158
func TestConstructPartitionExact(t *testing.T) {
60159
partitionNumber := 10

0 commit comments

Comments
 (0)