Skip to content

Commit 52b039e

Browse files
committed
Adding drain and any methods for handling channel drains and any conversions
1 parent 080466e commit 52b039e

File tree

6 files changed

+165
-54
lines changed

6 files changed

+165
-54
lines changed

.golangci.yml

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,19 @@ linters-settings:
1717
goconst:
1818
min-len: 2
1919
min-occurrences: 2
20-
gocritic:
21-
enabled-tags:
22-
- diagnostic
23-
- experimental
24-
- opinionated
25-
- performance
26-
- style
27-
disabled-checks:
28-
- dupImport # https://github.com/go-critic/go-critic/issues/845
29-
- ifElseChain
30-
- octalLiteral
31-
- whyNoLint
32-
- wrapperFunc
20+
# gocritic:
21+
# enabled-tags:
22+
# - diagnostic
23+
# - experimental
24+
# - opinionated
25+
# - performance
26+
# - style
27+
# disabled-checks:
28+
# - dupImport # https://github.com/go-critic/go-critic/issues/845
29+
# - ifElseChain
30+
# - octalLiteral
31+
# - whyNoLint
32+
# - wrapperFunc
3333
gocyclo:
3434
min-complexity: 15
3535
goimports:
@@ -73,7 +73,7 @@ linters:
7373
- exhaustive
7474
- funlen
7575
- goconst
76-
- gocritic
76+
# - gocritic
7777
- gocyclo
7878
- gofmt
7979
- goimports
@@ -143,4 +143,4 @@ run:
143143
service:
144144
golangci-lint-version: 1.23.x # use the fixed version to not introduce new linters unexpectedly
145145
prepare:
146-
- echo "here I can run custom commands, but no preparation needed for this repo"
146+
- echo "here I can run custom commands, but no preparation needed for this repo"

.pre-commit-config.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,23 @@ repos:
88
entry: detect-secrets-hook
99
args: ['--baseline', '.secrets.baseline']
1010
- repo: https://github.com/golangci/golangci-lint
11-
rev: v1.43.0
11+
rev: v1.49.0
1212
hooks:
1313
- id: golangci-lint
1414
- repo: https://github.com/Bahjat/pre-commit-golang
15-
rev: c3086eea8af86847dbdff2e46b85a5fe3c9d9656 # pragma: allowlist secret
15+
rev: v1.0.3 # pragma: allowlist secret
1616
hooks:
1717
- id: go-unit-tests
1818
- repo: https://github.com/pre-commit/pre-commit-hooks
19-
rev: v4.0.1
19+
rev: v4.3.0
2020
hooks:
2121
- id: check-json
2222
- id: check-added-large-files
2323
- id: pretty-format-json
2424
- id: check-merge-conflict
2525
- id: check-yaml
2626
- repo: https://github.com/igorshubovych/markdownlint-cli
27-
rev: v0.30.0
27+
rev: v0.32.2
2828
hooks:
2929
- id: markdownlint-fix
3030
- repo: https://github.com/koalaman/shellcheck-precommit

scaler_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ func ScalerTest[U ~[]T, T comparable](
3535
}
3636

3737
// Test that the scaler can be used with a nil context.
38+
//nolint:staticcheck
3839
out, err := s.Exec(nil, testdata.Chan(ctx))
3940
if err != nil {
4041
t.Errorf("expected no error, got %v", err)
@@ -83,6 +84,7 @@ func Test_Scaler_Exec(t *testing.T) {
8384
func Test_Scaler_NilFn(t *testing.T) {
8485
s := Scaler[any, any]{}
8586

87+
//nolint:staticcheck
8688
_, err := s.Exec(nil, nil)
8789
if err == nil {
8890
t.Error("Expected error, got nil")
@@ -106,6 +108,7 @@ func Test_Scaler_NilCtx(t *testing.T) {
106108
cancel()
107109

108110
// Test that the scaler can be used with a nil context.
111+
//nolint:staticcheck
109112
out, err := s.Exec(nil, nil)
110113
if err != nil {
111114
t.Errorf("expected no error, got %v", err)
@@ -130,6 +133,7 @@ func Test_Scaler_CloseIn(t *testing.T) {
130133
close(in)
131134

132135
// Test that the scaler can be used with a nil context.
136+
//nolint:staticcheck
133137
out, err := s.Exec(nil, in)
134138
if err != nil {
135139
t.Errorf("expected no error, got %v", err)

stream.go

Lines changed: 54 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
//
44
// It is recommended to use the package via the following import:
55
//
6-
// import . "go.atomizer.io/stream"
6+
// import . "go.atomizer.io/stream"
77
//
88
// Using the `.` import allows for functions to be called directly as if
99
// the functions were in the same namespace without the need to append
@@ -18,13 +18,23 @@ import (
1818
"go.structs.dev/gen"
1919
)
2020

21+
type readonly[T any] interface {
22+
<-chan T | chan T
23+
}
24+
25+
type writeonly[T any] interface {
26+
chan<- T | chan T
27+
}
28+
2129
// Pipe accepts an incoming data channel and pipes it to the supplied
2230
// outgoing data channel.
2331
//
2432
// NOTE: Execute the Pipe function in a goroutine if parallel execution is
25-
// desired. Cancelling the context or closing the incoming channel is important
33+
// desired. Canceling the context or closing the incoming channel is important
2634
// to ensure that the goroutine is properly terminated.
27-
func Pipe[T any](ctx context.Context, in <-chan T, out chan<- T) {
35+
func Pipe[In readonly[T], Out writeonly[T], T any](
36+
ctx context.Context, in In, out Out,
37+
) {
2838
ctx = _ctx(ctx)
2939

3040
for {
@@ -51,10 +61,10 @@ type InterceptFunc[T, U any] func(context.Context, T) (U, bool)
5161
// accepts the incoming data and returns data of the same type and a boolean
5262
// indicating whether the data should be forwarded to the output channel.
5363
// The function is executed for each data item in the incoming channel as long
54-
// as the context is not cancelled or the incoming channel remains open.
55-
func Intercept[T, U any](
64+
// as the context is not canceled or the incoming channel remains open.
65+
func Intercept[In readonly[T], T, U any](
5666
ctx context.Context,
57-
in <-chan T,
67+
in In,
5868
fn InterceptFunc[T, U],
5969
) <-chan U {
6070
ctx = _ctx(ctx)
@@ -100,9 +110,9 @@ func Intercept[T, U any](
100110
// that receives all the data from the supplied channels.
101111
//
102112
// NOTE: The transfer takes place in a goroutine for each channel
103-
// so ensuring that the context is cancelled or the incoming channels
113+
// so ensuring that the context is canceled or the incoming channels
104114
// are closed is important to ensure that the goroutine is terminated.
105-
func FanIn[T any](ctx context.Context, in ...<-chan T) <-chan T {
115+
func FanIn[In readonly[T], T any](ctx context.Context, in ...In) <-chan T {
106116
ctx = _ctx(ctx)
107117
out := make(chan T)
108118

@@ -135,9 +145,11 @@ func FanIn[T any](ctx context.Context, in ...<-chan T) <-chan T {
135145
// supplied outgoing data channels.
136146
//
137147
// NOTE: Execute the FanOut function in a goroutine if parallel execution is
138-
// desired. Cancelling the context or closing the incoming channel is important
148+
// desired. Canceling the context or closing the incoming channel is important
139149
// to ensure that the goroutine is properly terminated.
140-
func FanOut[T any](ctx context.Context, in <-chan T, out ...chan<- T) {
150+
func FanOut[In readonly[T], Out writeonly[T], T any](
151+
ctx context.Context, in In, out ...Out,
152+
) {
141153
ctx = _ctx(ctx)
142154

143155
if len(out) == 0 {
@@ -178,25 +190,26 @@ func FanOut[T any](ctx context.Context, in <-chan T, out ...chan<- T) {
178190
for len(selectCases) > 1 {
179191
chosen, _, _ := reflect.Select(selectCases)
180192

181-
// The context was cancelled.
193+
// The context was canceled.
182194
if chosen == 0 {
183195
return
184196
}
185197

186198
selectCases = gen.Exclude(selectCases, selectCases[chosen])
187199
}
188200
}
189-
190201
}
191202
}
192203

193204
// Distribute accepts an incoming data channel and distributes the data among
194205
// the supplied outgoing data channels using a dynamic select statement.
195206
//
196207
// NOTE: Execute the Distribute function in a goroutine if parallel execution is
197-
// desired. Cancelling the context or closing the incoming channel is important
208+
// desired. Canceling the context or closing the incoming channel is important
198209
// to ensure that the goroutine is properly terminated.
199-
func Distribute[T any](ctx context.Context, in <-chan T, out ...chan<- T) {
210+
func Distribute[In readonly[T], Out writeonly[T], T any](
211+
ctx context.Context, in In, out ...Out,
212+
) {
200213
ctx = _ctx(ctx)
201214

202215
if len(out) == 0 {
@@ -226,6 +239,32 @@ func Distribute[T any](ctx context.Context, in <-chan T, out ...chan<- T) {
226239
})
227240
_, _, _ = reflect.Select(selectCases)
228241
}
229-
230242
}
231243
}
244+
245+
// Drain accepts a channel and drains the channel until the channel is closed
246+
// or the context is canceled.
247+
func Drain[U readonly[T], T any](ctx context.Context, in U) {
248+
ctx = _ctx(ctx)
249+
250+
go func() {
251+
for {
252+
select {
253+
case <-ctx.Done():
254+
return
255+
case _, ok := <-in:
256+
if !ok {
257+
return
258+
}
259+
}
260+
}
261+
}()
262+
}
263+
264+
// Any accepts an incoming data channel and converts the channel to a readonly
265+
// channel of the `any` type.
266+
func Any[U readonly[T], T any](ctx context.Context, in U) <-chan any {
267+
return Intercept(ctx, in, func(_ context.Context, in T) (any, bool) {
268+
return in, true
269+
})
270+
}

stream_bench_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func Benchmark_Pipe(b *testing.B) {
1919
for n := 0; n < b.N; n++ {
2020
select {
2121
case <-ctx.Done():
22-
b.Fatal("context cancelled")
22+
b.Fatal("context canceled")
2323
case c1 <- value:
2424
case out, ok := <-c2:
2525
if !ok {
@@ -47,7 +47,7 @@ func Benchmark_Intercept(b *testing.B) {
4747
for n := 0; n < b.N; n++ {
4848
select {
4949
case <-ctx.Done():
50-
b.Fatal("context cancelled")
50+
b.Fatal("context canceled")
5151
case in <- value:
5252
case out, ok := <-out:
5353
if !ok {
@@ -75,7 +75,7 @@ func Benchmark_FanIn(b *testing.B) {
7575
for i := 0; i < 2; i++ {
7676
select {
7777
case <-ctx.Done():
78-
b.Fatal("context cancelled")
78+
b.Fatal("context canceled")
7979
case _, ok := <-out:
8080
if !ok {
8181
b.Fatal("out closed prematurely")
@@ -134,6 +134,7 @@ func Benchmark_Scaler(b *testing.B) {
134134

135135
for n := 0; n < b.N; n++ {
136136
// Test that the scaler can be used with a nil context.
137+
//nolint:staticcheck
137138
out, err := s.Exec(nil, testdata.Chan(ctx))
138139
if err != nil {
139140
b.Errorf("expected no error, got %v", err)

0 commit comments

Comments
 (0)