Skip to content

Commit 080466e

Browse files
committed
Correcting issue in fan-in
1 parent b8105ad commit 080466e

File tree

5 files changed

+25
-27
lines changed

5 files changed

+25
-27
lines changed

README.md

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,6 @@ To install the package, run:
1818
go get -u go.atomizer.io/stream@latest
1919
```
2020

21-
## Importing
22-
23-
It is recommended to use the package via the following import:
24-
25-
`import . "go.atomizer.io/stream"`
26-
27-
Using the `.` import allows for functions to be called directly as if the
28-
functions were in the same namespace without the need to append the package
29-
name.
30-
3121
## Benchmarks
3222

3323
To execute the benchmarks, run the following command:

scaler_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"testing"
66
"time"
77

8-
. "go.structs.dev/gen"
8+
"go.structs.dev/gen"
99
)
1010

1111
var emptyFn = func(context.Context, any) (any, bool) { return 0, true }
@@ -24,7 +24,7 @@ func ScalerTest[U ~[]T, T comparable](
2424
ctx, cancel := context.WithCancel(context.Background())
2525
defer cancel()
2626

27-
testdata := Slice[T](data)
27+
testdata := gen.Slice[T](data)
2828

2929
integers := testdata.Map()
3030

stream.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ package stream
1313
import (
1414
"context"
1515
"reflect"
16+
"sync"
1617

17-
. "go.structs.dev/gen"
18+
"go.structs.dev/gen"
1819
)
1920

2021
// Pipe accepts an incoming data channel and pipes it to the supplied
@@ -110,16 +111,21 @@ func FanIn[T any](ctx context.Context, in ...<-chan T) <-chan T {
110111
return out
111112
}
112113

114+
var wg sync.WaitGroup
113115
defer func() {
114116
go func() {
115-
<-ctx.Done()
117+
wg.Wait()
116118
close(out)
117119
}()
118120
}()
119121

122+
wg.Add(len(in))
120123
for _, i := range in {
121124
// Pipe the result of the channel to the output channel.
122-
go Pipe(ctx, i, out)
125+
go func(i <-chan T) {
126+
defer wg.Done()
127+
Pipe(ctx, i, out)
128+
}(i)
123129
}
124130

125131
return out
@@ -177,7 +183,7 @@ func FanOut[T any](ctx context.Context, in <-chan T, out ...chan<- T) {
177183
return
178184
}
179185

180-
selectCases = Exclude(selectCases, selectCases[chosen])
186+
selectCases = gen.Exclude(selectCases, selectCases[chosen])
181187
}
182188
}
183189

stream_bench_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"context"
55
"testing"
66

7-
. "go.structs.dev/gen"
7+
"go.structs.dev/gen"
88
)
99

1010
func Benchmark_Pipe(b *testing.B) {
@@ -122,7 +122,7 @@ func Benchmark_Scaler(b *testing.B) {
122122
ctx, cancel := context.WithCancel(context.Background())
123123
defer cancel()
124124

125-
testdata := Slice[int](Ints[int](100))
125+
testdata := gen.Slice[int](Ints[int](100))
126126

127127
s := Scaler[int, int]{
128128
Fn: func(_ context.Context, in int) (int, bool) {

stream_test.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"testing"
66
"time"
77

8-
. "go.structs.dev/gen"
8+
"go.structs.dev/gen"
99
)
1010

1111
func PipeTest[U ~[]T, T comparable](
@@ -95,7 +95,7 @@ func FanInTest[U ~[]T, T comparable](
9595
out[i] = make(chan T)
9696
}
9797

98-
fan := FanIn(ctx, ReadOnly(out...)...)
98+
fan := FanIn(ctx, gen.ReadOnly(out...)...)
9999

100100
ichan := 0
101101
cursor := 0
@@ -117,23 +117,25 @@ func FanInTest[U ~[]T, T comparable](
117117
}
118118

119119
returned := make([]T, len(data))
120-
for i := 0; i < len(data); i++ {
120+
for i := 0; ; i++ {
121121
select {
122122
case <-ctx.Done():
123123
t.Error("context cancelled")
124124
return
125125
case out, ok := <-fan:
126126
if !ok {
127-
if i != len(data)-1 {
128-
t.Fatal("c2 closed prematurely")
127+
if i != len(data) {
128+
t.Fatalf("c2 closed prematurely; index %v", i)
129129
}
130+
131+
return
130132
}
131133

132134
returned[i] = out
133135
}
134136
}
135137

136-
diff := Diff(data, returned)
138+
diff := gen.Diff(data, returned)
137139
if len(diff) != 0 {
138140
t.Errorf("unexpected diff: %v", diff)
139141
}
@@ -222,7 +224,7 @@ func Test_Intercept_ChangeType(t *testing.T) {
222224

223225
out := Intercept(
224226
ctx,
225-
Slice[int](integers).Chan(ctx),
227+
gen.Slice[int](integers).Chan(ctx),
226228
func(_ context.Context, in int) (bool, bool) {
227229
return in%2 == 0, true
228230
})
@@ -355,7 +357,7 @@ func DistributeTest[U ~[]T, T comparable](
355357

356358
c1, c2, c3 := make(chan T), make(chan T), make(chan T)
357359

358-
go Distribute(ctx, Slice[T](data).Chan(ctx), c1, c2, c3)
360+
go Distribute(ctx, gen.Slice[T](data).Chan(ctx), c1, c2, c3)
359361

360362
c1total, c2total, c3total := 0, 0, 0
361363
for i := 0; i < len(data); i++ {
@@ -451,7 +453,7 @@ func Test_FanOut(t *testing.T) {
451453
var c4 chan int
452454
data := Ints[int](1000)
453455

454-
go FanOut(ctx, Slice[int](data).Chan(ctx), c1, c2, c3, c4)
456+
go FanOut(ctx, gen.Slice[int](data).Chan(ctx), c1, c2, c3, c4)
455457

456458
seen := make(map[int]int)
457459
for i := 0; i < len(data)*3; i++ {

0 commit comments

Comments
 (0)