Skip to content

Commit dfc9ddb

Browse files
committed
- Implementation of the performance pipelines for benchmarking
1 parent 254ab34 commit dfc9ddb

File tree

5 files changed

+319
-1
lines changed

5 files changed

+319
-1
lines changed

blobporter.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"sync/atomic"
1313
"time"
1414

15+
"github.com/Azure/blobporter/sources"
16+
1517
"github.com/Azure/blobporter/pipeline"
1618
"github.com/Azure/blobporter/transfer"
1719
"github.com/Azure/blobporter/util"
@@ -37,6 +39,7 @@ var storageClientHTTPTimeout int
3739

3840
var sourceParameters map[string]string
3941
var sourceAuthorization string
42+
var perfSourceDefinitions []sources.SourceDefinition
4043

4144
var quietMode bool
4245
var calculateMD5 bool
@@ -54,7 +57,7 @@ const (
5457
s3AccessKeyEnvVar = "S3_ACCESS_KEY"
5558
s3SecretKeyEnvVar = "S3_SECRET_KEY"
5659

57-
programVersion = "0.5.23" // version number to show in help
60+
programVersion = "0.5.24" // version number to show in help
5861
)
5962

6063
const numOfWorkersFactor = 8

pipelinefactory.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,24 @@ func getPipelines() ([]pipeline.SourcePipeline, pipeline.TargetPipeline, error)
125125
pvSourceInfoForS3IsReq,
126126
pvContainerIsReq,
127127
pvBlockSizeCheckForBlockBlobs)
128+
case transfer.PerfToBlock:
129+
return getTransferPipelines(getPerfSourceToBlockPipelines,
130+
pvBlobAuthInfoIsReq,
131+
pvContainerIsReq,
132+
pvSourceURIISReq,
133+
pvBlockSizeCheckForBlockBlobs,
134+
pvPerfSourceIsReq)
135+
case transfer.PerfToPage:
136+
return getTransferPipelines(getPerfSourceToPagePipelines,
137+
pvBlobAuthInfoIsReq,
138+
pvContainerIsReq,
139+
pvSourceURIISReq,
140+
pvBlockSizeCheckForPageBlobs,
141+
pvPerfSourceIsReq)
142+
case transfer.BlobToPerf:
143+
return getTransferPipelines(getBlobToPerfPipelines,
144+
pvSourceURIISReq,
145+
pvSourceInfoForBlobIsReq)
128146

129147
}
130148

@@ -245,6 +263,53 @@ func getBlobToPagePipelines() (source []pipeline.SourcePipeline, target pipeline
245263
target = targets.NewAzurePage(storageAccountName, storageAccountKey, containerName)
246264
return
247265
}
266+
func getPerfSourceToPagePipelines() (source []pipeline.SourcePipeline, target pipeline.TargetPipeline, err error) {
267+
268+
defs := perfSourceDefinitions
269+
270+
params := sources.PerfSourceParams{
271+
Definitions: defs,
272+
SourceParams: sources.SourceParams{
273+
CalculateMD5: calculateMD5}}
274+
275+
source = sources.NewPerfSourcePipeline(params, blockSize)
276+
target = targets.NewAzurePage(storageAccountName, storageAccountKey, containerName)
277+
return
278+
}
279+
280+
func getPerfSourceToBlockPipelines() (source []pipeline.SourcePipeline, target pipeline.TargetPipeline, err error) {
281+
282+
defs := perfSourceDefinitions
283+
284+
params := sources.PerfSourceParams{
285+
Definitions: defs,
286+
SourceParams: sources.SourceParams{
287+
CalculateMD5: calculateMD5}}
288+
289+
source = sources.NewPerfSourcePipeline(params, blockSize)
290+
target = targets.NewAzureBlock(storageAccountName, storageAccountKey, containerName)
291+
return
292+
}
293+
func getBlobToPerfPipelines() (source []pipeline.SourcePipeline, target pipeline.TargetPipeline, err error) {
294+
295+
blobNames = []string{sourceParameters["PREFIX"]}
296+
297+
params := &sources.AzureBlobParams{
298+
Container: sourceParameters["CONTAINER"],
299+
BlobNames: blobNames,
300+
AccountName: sourceParameters["ACCOUNT_NAME"],
301+
AccountKey: sourceAuthorization,
302+
SourceParams: sources.SourceParams{
303+
CalculateMD5: calculateMD5,
304+
UseExactNameMatch: exactNameMatch,
305+
FilesPerPipeline: numberOfFilesInBatch,
306+
//default to always true so blob names are kept
307+
KeepDirStructure: true}}
308+
309+
source = sources.NewAzureBlob(params)
310+
target = targets.NewPerfTargetPipeline()
311+
return
312+
}
248313
func getBlobToBlockPipelines() (source []pipeline.SourcePipeline, target pipeline.TargetPipeline, err error) {
249314

250315
blobNames = []string{sourceParameters["PREFIX"]}
@@ -521,6 +586,17 @@ func pvContainerIsReq() error {
521586
return nil
522587
}
523588

589+
func pvPerfSourceIsReq() error {
590+
var err error
591+
perfSourceDefinitions, err = sources.ParseSourceDefinitions(sourceURIs[0])
592+
593+
if err != nil {
594+
return err
595+
}
596+
597+
return nil
598+
}
599+
524600
func pvBlobAuthInfoIsReq() error {
525601

526602
// wasn't specified, try the environment variable

sources/perfsource.go

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package sources
2+
3+
import (
4+
"fmt"
5+
"math/rand"
6+
"strconv"
7+
"strings"
8+
"sync"
9+
"time"
10+
11+
"github.com/Azure/blobporter/pipeline"
12+
"github.com/Azure/blobporter/util"
13+
)
14+
15+
//SourceDefinition TODO
16+
type SourceDefinition struct {
17+
Names []string
18+
Size uint64
19+
NumberOfSources int
20+
}
21+
22+
//ParseSourceDefinitions TODO
23+
func ParseSourceDefinitions(def string) ([]SourceDefinition, error) {
24+
data := strings.Split(def, ";")
25+
defs := make([]SourceDefinition, len(data))
26+
for s := 0; s < len(data); s++ {
27+
def, err := newSourceDefinition(data[s])
28+
if err != nil {
29+
return nil, err
30+
}
31+
defs[s] = *def
32+
}
33+
return defs, nil
34+
}
35+
36+
func newSourceDefinition(def string) (*SourceDefinition, error) {
37+
data := strings.Split(def, ":")
38+
39+
if len(data) != 2 {
40+
return nil, fmt.Errorf("Invalid format: %s\nThe source definition must be [Size]:[NumOfSources]", data)
41+
}
42+
43+
size, err := util.ByteCountFromSizeString(data[0])
44+
if err != nil {
45+
return nil, fmt.Errorf("Invalid format. The source definition must be [Size]:[NumOfSources]. The size is invalid. Error:%v", err)
46+
}
47+
var numOfSrcs int
48+
numOfSrcs, err = strconv.Atoi(data[1])
49+
if err != nil {
50+
return nil, fmt.Errorf("Invalid format. The source definition must be [Size]:[NumOfSources]. Failed to parse the number of sources. Error:%v", err)
51+
}
52+
names := make([]string, numOfSrcs)
53+
for n := 0; n < numOfSrcs; n++ {
54+
names[n] = fmt.Sprintf("%s%v.dat", strings.Replace(def,":","_",1), time.Now().Nanosecond())
55+
}
56+
57+
return &SourceDefinition{Size: size, NumberOfSources: numOfSrcs, Names: names}, nil
58+
}
59+
60+
func (d *SourceDefinition) getSourceInfo() []pipeline.SourceInfo {
61+
infos := make([]pipeline.SourceInfo, len(d.Names))
62+
for i, n := range d.Names {
63+
infos[i] = pipeline.SourceInfo{SourceName: n, TargetAlias: n, Size: d.Size}
64+
}
65+
return infos
66+
}
67+
68+
//PerfSourcePipeline TODO
69+
type PerfSourcePipeline struct {
70+
definitions []SourceDefinition
71+
blockSize uint64
72+
dataBlock []byte
73+
includeMD5 bool
74+
}
75+
76+
//PerfSourceParams TODO
77+
type PerfSourceParams struct {
78+
SourceParams
79+
Definitions []SourceDefinition
80+
}
81+
82+
//NewPerfSourcePipeline TODO
83+
func NewPerfSourcePipeline(params PerfSourceParams, blockSize uint64) []pipeline.SourcePipeline {
84+
ssps := make([]pipeline.SourcePipeline, 1)
85+
ssp := PerfSourcePipeline{
86+
definitions: params.Definitions,
87+
blockSize: blockSize,
88+
includeMD5: params.CalculateMD5}
89+
ssp.setSharedDataBlock()
90+
ssps[0] = &ssp
91+
return ssps
92+
}
93+
94+
//ConstructBlockInfoQueue TODO
95+
func (s *PerfSourcePipeline) ConstructBlockInfoQueue(blockSize uint64) (partitionQ chan pipeline.PartsPartition, partsQ chan pipeline.Part, numOfBlocks int, size uint64) {
96+
allParts := make([][]pipeline.Part, len(s.definitions))
97+
//disable memory buffer for parts (bufferQ == nil)
98+
var bufferQ chan []byte
99+
largestNumOfParts := 0
100+
for i, source := range s.definitions {
101+
srcParts := make([]pipeline.Part, 0)
102+
for _, name := range source.Names {
103+
size = size + source.Size
104+
parts, sourceNumOfBlocks := pipeline.ConstructPartsQueue(source.Size, blockSize, name, name, bufferQ)
105+
srcParts = append(srcParts, parts...)
106+
numOfBlocks = numOfBlocks + sourceNumOfBlocks
107+
}
108+
allParts[i] = srcParts
109+
if largestNumOfParts < len(srcParts) {
110+
largestNumOfParts = len(srcParts)
111+
}
112+
}
113+
114+
partsQ = make(chan pipeline.Part, numOfBlocks)
115+
116+
for i := 0; i < largestNumOfParts; i++ {
117+
for _, ps := range allParts {
118+
if i < len(ps) {
119+
partsQ <- ps[i]
120+
}
121+
}
122+
}
123+
124+
close(partsQ)
125+
126+
return
127+
}
128+
129+
//ExecuteReader TODO
130+
func (s *PerfSourcePipeline) ExecuteReader(partitionQ chan pipeline.PartsPartition, partsQ chan pipeline.Part, readPartsQ chan pipeline.Part, id int, wg *sync.WaitGroup) {
131+
var blocksHandled = 0
132+
defer wg.Done()
133+
for {
134+
p, ok := <-partsQ
135+
136+
if !ok {
137+
return // no more blocks of file data to be read
138+
}
139+
if s.includeMD5 {
140+
p.MD5()
141+
}
142+
143+
p.Data = s.dataBlock
144+
145+
if len(s.dataBlock) != int(p.BytesToRead) {
146+
small := s.dataBlock[:p.BytesToRead]
147+
p.Data = small
148+
}
149+
150+
readPartsQ <- p
151+
blocksHandled++
152+
}
153+
}
154+
155+
//GetSourcesInfo TODO
156+
func (s *PerfSourcePipeline) GetSourcesInfo() []pipeline.SourceInfo {
157+
srcInfo := make([]pipeline.SourceInfo, 0)
158+
for _, def := range s.definitions {
159+
srcInfo = append(srcInfo, def.getSourceInfo()...)
160+
}
161+
return srcInfo
162+
}
163+
164+
var data = []byte("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ")
165+
var block []byte
166+
167+
func (s *PerfSourcePipeline) setSharedDataBlock() {
168+
s.dataBlock = s.getDataBlock(s.blockSize)
169+
}
170+
171+
func (s *PerfSourcePipeline) getDataBlock(size uint64) []byte {
172+
b := make([]byte, size)
173+
ln := len(data)
174+
for i := 0; i < len(b); i++ {
175+
b[i] = data[rand.Intn(ln)]
176+
}
177+
178+
return b
179+
}

targets/perftarget.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package targets
2+
3+
import (
4+
"time"
5+
6+
"github.com/Azure/blobporter/pipeline"
7+
)
8+
9+
////////////////////////////////////////////////////////////
10+
///// Perf Target
11+
////////////////////////////////////////////////////////////
12+
13+
//PerfTarget TODO
14+
type PerfTarget struct {
15+
}
16+
17+
//NewPerfTargetPipeline creates a new Azure Block target
18+
func NewPerfTargetPipeline() pipeline.TargetPipeline {
19+
return &PerfTarget{}
20+
}
21+
22+
//PreProcessSourceInfo TODO.
23+
func (t *PerfTarget) PreProcessSourceInfo(source *pipeline.SourceInfo, blockSize uint64) (err error) {
24+
return nil
25+
}
26+
27+
//CommitList TODO
28+
func (t *PerfTarget) CommitList(listInfo *pipeline.TargetCommittedListInfo, NumberOfBlocks int, targetName string) (msg string, err error) {
29+
msg = "Perf test committed"
30+
err = nil
31+
return
32+
}
33+
34+
//ProcessWrittenPart TODO
35+
func (t *PerfTarget) ProcessWrittenPart(result *pipeline.WorkerResult, listInfo *pipeline.TargetCommittedListInfo) (requeue bool, err error) {
36+
requeue = false
37+
err = nil
38+
return
39+
}
40+
41+
//WritePart TODO
42+
func (t *PerfTarget) WritePart(part *pipeline.Part) (duration time.Duration, startTime time.Time, numOfRetries int, err error) {
43+
44+
s := time.Now()
45+
numOfRetries = 0
46+
err = nil
47+
startTime = s
48+
duration = time.Now().Sub(s)
49+
50+
return
51+
}

transfer/transfer.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ const (
8484
BlobToPage = "blob-pageblob"
8585
S3ToBlock = "s3-blockblob"
8686
S3ToPage = "s3-pageblob"
87+
PerfToBlock = "perf-block"
88+
PerfToPage = "perf-page"
89+
BlobToPerf = "blob-perf"
8790
none = "none"
8891
)
8992

@@ -121,6 +124,12 @@ func ParseTransferDefinition(str string) (Definition, error) {
121124
return S3ToBlock, nil
122125
case "s3-pageblob":
123126
return S3ToPage, nil
127+
case "perf-block":
128+
return PerfToBlock, nil
129+
case "perf-page":
130+
return PerfToPage, nil
131+
case "blob-perf":
132+
return BlobToPerf, nil
124133
default:
125134
return none, fmt.Errorf("%v is not a valid transfer definition value.\n Valid values: file-blockblob, http-blockblob,file-pageblob, http-pageblob, pageblob-file, blockblob-file, http-file", str)
126135
}

0 commit comments

Comments
 (0)