Skip to content

Commit 881d700

Browse files
committed
docs updates and refactoring
1 parent 50b88ac commit 881d700

15 files changed

+266
-198
lines changed

.travis.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ before_install:
77
# Download the binary to bin folder in $GOPATH
88
- curl -L -s https://github.com/golang/dep/releases/download/v${DEP_VERSION}/dep-linux-amd64 -o $GOPATH/bin/dep
99
# Make the binary executable
10-
- chmod +x $GOPATH/bin/dep
11-
- go get -d github.com/stretchr/testify/assert
10+
#- chmod +x $GOPATH/bin/dep
11+
#- go get -d github.com/stretchr/testify/assert
1212
install:
13-
# - dep ensure
13+
- export GO111MODULE=on
1414
script:
1515
- go test -v ./...
1616
- mkdir linux_amd64

blobporter.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,10 @@ func main() {
149149
tfer := transfer.NewTransfer(sourcePipeline.Source, targetPipeline, argsUtil.params.numberOfReaders, argsUtil.params.numberOfWorkers, argsUtil.params.blockSize)
150150
tfer.SetTransferTracker(argsUtil.params.tracker)
151151
prog.newTransfer(float64(tfer.TotalSize), sourcesInfo, argsUtil.params.transferType)
152-
152+
153153
tfer.StartTransfer(argsUtil.params.dedupeLevel)
154154
tfer.WaitForCompletion()
155155

156-
157156
}
158157

159158
if argsUtil.params.tracker != nil {
@@ -164,7 +163,6 @@ func main() {
164163

165164
prog.displayGlobalSummary()
166165

167-
168166
}
169167

170168
func getProgressBarDelegate(totalSize uint64, quietMode bool) func(r pipeline.WorkerResult, committedCount int, bufferLevel int) {

docs/gettingstarted.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Download, extract and set permissions
99

1010
::
1111

12-
wget -O bp_linux.tar.gz https://github.com/Azure/blobporter/releases/download/v0.6.15/bp_linux.tar.gz
12+
wget -O bp_linux.tar.gz https://github.com/Azure/blobporter/releases/download/v0.6.20/bp_linux.tar.gz
1313
tar -xvf bp_linux.tar.gz linux_amd64/blobporter
1414
chmod +x ~/linux_amd64/blobporter
1515
cd ~/linux_amd64
@@ -26,7 +26,7 @@ Set environment variables: ::
2626
Windows
2727
-------
2828

29-
Download `BlobPorter.exe <https://github.com/Azure/blobporter/releases/download/v0.6.15/bp_windows.zip>`_
29+
Download `BlobPorter.exe <https://github.com/Azure/blobporter/releases/download/v0.6.20/bp_windows.zip>`_
3030

3131
Set environment variables (if using the command prompt): ::
3232

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ require (
1313
github.com/stretchr/testify v1.2.2
1414
golang.org/x/crypto v0.0.0-20180621125126-a49355c7e3f8
1515
golang.org/x/net v0.0.0-20180124060956-0ed95abb35c4
16+
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4
1617
golang.org/x/sys v0.0.0-20180709060233-1b2967e3c290
1718
golang.org/x/text v0.3.0
1819
)

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
3131
golang.org/x/crypto v0.0.0-20180621125126-a49355c7e3f8/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
3232
golang.org/x/net v0.0.0-20180124060956-0ed95abb35c4 h1:BLERX6fu5dNMZcaGP2RzbrDZpHQbDkAoG9oiTRXbWr0=
3333
golang.org/x/net v0.0.0-20180124060956-0ed95abb35c4/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
34+
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
35+
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
3436
golang.org/x/sys v0.0.0-20180709060233-1b2967e3c290/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
3537
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
3638
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

internal/azutil.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func NewAzUtil(accountName string, accountKey string, container string, baseBlob
3232
creds, err := azblob.NewSharedKeyCredential(accountName, accountKey)
3333

3434
if err != nil {
35-
return nil, err
35+
return nil, fmt.Errorf(" error in the credential. Check the key value. Error:%v", err)
3636
}
3737

3838
pipeline := newPipeline(creds, azblob.PipelineOptions{

internal/eventsink.go

Lines changed: 65 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,12 @@ type eventAggregateReq struct {
8888

8989
type eventSink struct {
9090
sync.Mutex
91-
sums map[string]EventItemAggregate
92-
subs map[EventSource][]EventSubscription
93-
ondonesubs map[EventSource][]EventSubscription
94-
eventsQ chan EventItem
95-
subsQ chan EventSubscription
96-
wg *sync.WaitGroup
97-
flushed bool
91+
sums map[string]EventItemAggregate
92+
subs map[EventSource][]EventSubscription
93+
ondonesubs map[EventSource][]EventSubscription
94+
eventsQ chan EventItem
95+
wg *sync.WaitGroup
96+
workerIsDone bool
9897
}
9998

10099
func newEventSink() *eventSink {
@@ -104,72 +103,64 @@ func newEventSink() *eventSink {
104103
return e
105104

106105
}
107-
func (e *eventSink) Reset() error {
108-
defer e.Unlock()
109-
e.Lock()
110-
if !e.flushed {
111-
return fmt.Errorf("The sink is not flushed")
106+
func (e *eventSink) Reset() {
107+
108+
if !e.workerIsDone {
109+
e.FlushAndWait()
112110
}
111+
113112
e.init()
114113
e.startWorker()
115114

116-
return nil
115+
return
117116
}
118117

119118
func (e *eventSink) startWorker() {
120-
e.flushed = false
119+
e.workerIsDone = false
121120
go func() {
122121
var sumEvent EventItemAggregate
123-
defer func() { e.flushed = true }()
122+
defer func() {
123+
e.workerIsDone = true
124+
}()
124125
for {
125-
select {
126-
case event, ok := <-e.eventsQ:
127-
if !ok {
128-
defer func() {
129-
for _, ondonesub := range e.ondonesubs {
130-
for _, sub := range ondonesub {
131-
for _, sumEvent := range e.sums {
132-
if sub.Source == sumEvent.LastEventItem.Source {
133-
sub.Delegate(sumEvent.LastEventItem, sumEvent)
134-
}
126+
127+
event, ok := <-e.eventsQ
128+
129+
if !ok {
130+
131+
defer func() {
132+
e.Lock()
133+
defer e.Unlock()
134+
for _, ondonesub := range e.ondonesubs {
135+
for _, sub := range ondonesub {
136+
for _, sumEvent := range e.sums {
137+
if sub.Source == sumEvent.LastEventItem.Source {
138+
sub.Delegate(sumEvent.LastEventItem, sumEvent)
135139
}
136140
}
137141
}
138-
e.wg.Done()
139-
}()
140-
return
141-
}
142-
143-
if event.Action == Sum {
144-
sumEvent = e.sums[event.key()]
145-
sumEvent.NumItems++
146-
sumEvent.LastEventItem = event
147-
value := event.Data[0].Value.(float64)
148-
sumEvent.Value += value
149-
e.sums[event.key()] = sumEvent
150-
}
151-
152-
esubs := e.subs[event.Source]
153-
for _, sub := range esubs {
154-
sub.Delegate(event, sumEvent)
155-
}
156-
157-
case sub, ok := <-e.subsQ:
158-
if !ok {
159-
continue
160-
}
161-
switch sub.Type {
162-
case RealTime:
163-
subsForSource := e.subs[sub.Source]
164-
subsForSource = append(subsForSource, sub)
165-
e.subs[sub.Source] = subsForSource
166-
case OnDone:
167-
subsForSource := e.ondonesubs[sub.Source]
168-
subsForSource = append(subsForSource, sub)
169-
e.ondonesubs[sub.Source] = subsForSource
170-
}
142+
}
143+
e.wg.Done()
144+
}()
145+
146+
return
147+
}
148+
149+
if event.Action == Sum {
150+
sumEvent = e.sums[event.key()]
151+
sumEvent.NumItems++
152+
sumEvent.LastEventItem = event
153+
value := event.Data[0].Value.(float64)
154+
sumEvent.Value += value
155+
e.sums[event.key()] = sumEvent
156+
}
171157

158+
e.Lock()
159+
esubs := e.subs[event.Source]
160+
for _, sub := range esubs {
161+
sub.Delegate(event, sumEvent)
172162
}
163+
e.Unlock()
173164
}
174165
}()
175166
}
@@ -178,28 +169,37 @@ func (e *eventSink) init() {
178169
e.subs = make(map[EventSource][]EventSubscription)
179170
e.ondonesubs = make(map[EventSource][]EventSubscription)
180171
e.eventsQ = make(chan EventItem, 10000)
181-
e.subsQ = make(chan EventSubscription, 100)
182172
e.wg = &sync.WaitGroup{}
183173
e.wg.Add(1)
184174
}
185175

186176
//FlushAndWait closese the sink's channels as waits for processing of pending events
187177
func (e *eventSink) FlushAndWait() {
188178
close(e.eventsQ)
189-
close(e.subsQ)
190179
e.wg.Wait()
180+
191181
}
192182

193183
//AddSubscription adds a subscription to the event sink
194184
func (e *eventSink) AddSubscription(source EventSource, subType EventSubscriptionType, delegate EventDelegate) {
195-
select {
196-
case e.subsQ <- EventSubscription{
185+
e.Lock()
186+
defer e.Unlock()
187+
188+
sub := EventSubscription{
197189
Delegate: delegate,
198190
Source: source,
199191
Type: subType,
200-
}:
201-
default:
202-
panic(fmt.Errorf("AddSubscription failed the channel is closed or full"))
192+
}
193+
194+
switch subType {
195+
case RealTime:
196+
subsForSource := e.subs[source]
197+
subsForSource = append(subsForSource, sub)
198+
e.subs[source] = subsForSource
199+
case OnDone:
200+
subsForSource := e.ondonesubs[source]
201+
subsForSource = append(subsForSource, sub)
202+
e.ondonesubs[source] = subsForSource
203203
}
204204
}
205205

internal/eventsink_test.go

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,19 @@ import (
77
)
88

99
type mockDel struct {
10-
numOfEventItem int
11-
numOfEventItemAggregate int
12-
numOfCalls int
10+
numOfEventItem int
11+
numOfCalls int
12+
aggregateResult int
1313
}
1414

1515
func (m *mockDel) delegate(e EventItem, a EventItemAggregate) {
1616
m.numOfEventItem++
17-
m.numOfEventItemAggregate++
17+
1818
m.numOfCalls++
19+
20+
if e.Action == Sum {
21+
m.aggregateResult = int(a.Value)
22+
}
1923
}
2024

2125
const testSource EventSource = 10
@@ -50,8 +54,8 @@ func TestOnDoneEvents(t *testing.T) {
5054

5155
es.FlushAndWait()
5256

53-
assert.Equal(t, md.numOfEventItemAggregate, 3, "Values must match")
54-
assert.Equal(t, md.numOfCalls, 3, "Values must match")
57+
assert.Equal(t, md.aggregateResult, 3, "Values must match")
58+
assert.Equal(t, md.numOfCalls, 1, "Values must match")
5559
}
5660

5761
func TestFlushCycle(t *testing.T) {
@@ -61,30 +65,25 @@ func TestFlushCycle(t *testing.T) {
6165
es.AddSubscription(testSource, RealTime, md.delegate)
6266
es.AddEvent(testSource, testEvent, "", EventData{Value: 1})
6367

64-
assert.Equal(t, md.numOfEventItem, 1, "Values must match")
65-
assert.Equal(t, md.numOfCalls, 1, "Values must match")
68+
es.Reset()
6669

67-
err := es.Reset()
68-
assert.Error(t, err, "Reset must fail as it hasn't been flushed")
70+
assert.Equal(t, 1, md.numOfEventItem, "Values must match")
71+
assert.Equal(t, 1, md.numOfCalls, "Values must match")
6972

70-
//even if the reset fail the sink must be able to receive events
73+
es.AddSubscription(testSource, RealTime, md.delegate)
7174
es.AddEvent(testSource, testEvent, "", EventData{Value: 1})
72-
assert.Equal(t, md.numOfEventItem, 2, "Values must match")
73-
assert.Equal(t, md.numOfCalls, 2, "Values must match")
75+
76+
es.Reset()
77+
78+
assert.Equal(t, 2, md.numOfEventItem, "Values must match")
79+
assert.Equal(t, 2, md.numOfCalls, "Values must match")
7480

7581
//add a sum event
7682
es.AddSubscription(testSource, OnDone, md.delegate)
7783
es.AddSumEvent(testSource, testEvent, "", oneFloat64)
7884

79-
//the sum event must be triggered at the end...
80-
assert.Equal(t, md.numOfCalls, 2, "Values must match")
81-
82-
es.FlushAndWait()
83-
84-
assert.Equal(t, md.numOfEventItemAggregate, 1, "Values must match")
85-
assert.Equal(t, md.numOfCalls, 3, "Values must match")
86-
87-
err = es.Reset()
88-
assert.NoError(t, err, "Reset must succeed as the sink is flushed")
85+
es.Reset()
8986

87+
assert.Equal(t, 1, md.aggregateResult, "Values must match")
88+
assert.Equal(t, 3, md.numOfCalls, "Values must match")
9089
}

0 commit comments

Comments
 (0)