Skip to content

Commit a6002ff

Browse files
committed
feat(streams.go): add Take function to limit the number of elements received from a channel
1 parent 9005f1a commit a6002ff

File tree

1 file changed

+26
-5
lines changed

1 file changed

+26
-5
lines changed

helper/streams/streams.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package streams
22

33
import (
44
"context"
5+
"errors"
56
"sync"
67
)
78

@@ -26,11 +27,11 @@ func New[T any](in []T) <-chan T {
2627
// the buffer of the channel is set to the number of values and the values are
2728
// pushed into the channel before returning.
2829
//
29-
// str, push, close := NewConcurrent(1, 2, 3)
30-
// push(context.TODO(), 4, 5, 6)
31-
// vals, err := All(str)
32-
// // handle err
33-
// // vals == []int{1, 2, 3, 4, 5, 6}
30+
// str, push, close := NewConcurrent(1, 2, 3)
31+
// push(context.TODO(), 4, 5, 6)
32+
// vals, err := All(str)
33+
// // handle err
34+
// // vals == []int{1, 2, 3, 4, 5, 6}
3435
//
3536
// Use the Concurrent function to create a `push` function for an existing channel.
3637
func NewConcurrent[T any](vals ...T) (_ <-chan T, _push func(context.Context, ...T) error, _close func()) {
@@ -104,6 +105,26 @@ func All[T any](in <-chan T, errs ...<-chan error) ([]T, error) {
104105
return Drain(context.Background(), in, errs...)
105106
}
106107

108+
var errTakeDone = errors.New("take done")
109+
110+
// Take receives elements from the input channel until it has received n
111+
// elements or the input channel is closed. It returns a slice containing the
112+
// received elements. If any error occurs during the process, it is returned as
113+
// the second return value.
114+
func Take[T any](ctx context.Context, n int, in <-chan T, errs ...<-chan error) ([]T, error) {
115+
out := make([]T, 0, n)
116+
if err := Walk(ctx, func(v T) error {
117+
out = append(out, v)
118+
if len(out) >= n {
119+
return errTakeDone
120+
}
121+
return nil
122+
}, in, errs...); err != nil && !errors.Is(err, errTakeDone) {
123+
return out, err
124+
}
125+
return out, nil
126+
}
127+
107128
// Walk receives from the given channel until it and and all provided error
108129
// channels are closed, ctx is closed or any of the provided error channels
109130
// receives an error. For every element e that is received from the input

0 commit comments

Comments
 (0)