Skip to content

Commit d0f2dfe

Browse files
committed
Removed the dependence on FanOut from pipe
1 parent 70187f4 commit d0f2dfe

File tree

1 file changed

+16
-2
lines changed

1 file changed

+16
-2
lines changed

stream.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,22 @@ import (
2626
func Pipe[T any](ctx context.Context, in <-chan T, out chan<- T) {
2727
ctx = _ctx(ctx)
2828

29-
// Pipe is just a fan-out of a single channel.
30-
FanOut(ctx, in, out)
29+
for {
30+
select {
31+
case <-ctx.Done():
32+
return
33+
case v, ok := <-in:
34+
if !ok {
35+
return
36+
}
37+
38+
select {
39+
case <-ctx.Done():
40+
return
41+
case out <- v:
42+
}
43+
}
44+
}
3145
}
3246

3347
type InterceptFunc[T, U any] func(context.Context, T) (U, bool)

0 commit comments

Comments
 (0)