-
Notifications
You must be signed in to change notification settings - Fork 2.1k
test(sources/s3): fix infinite blocking and timeout issue in TestSource_Chunks #4048
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
1a7abe0
c2d0401
d6655fe
6e020ff
1ca8a95
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,7 +4,6 @@ import ( | |
| "encoding/base64" | ||
| "fmt" | ||
| "os" | ||
| "sync" | ||
| "testing" | ||
| "time" | ||
|
|
||
|
|
@@ -99,8 +98,7 @@ func TestSource_Chunks(t *testing.T) { | |
| for _, tt := range tests { | ||
| t.Run(tt.name, func(t *testing.T) { | ||
| ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) | ||
| var cancelOnce sync.Once | ||
| defer cancelOnce.Do(cancel) | ||
| defer cancel() | ||
|
|
||
| for k, v := range tt.init.setEnv { | ||
| t.Setenv(k, v) | ||
|
|
@@ -117,26 +115,46 @@ func TestSource_Chunks(t *testing.T) { | |
| t.Errorf("Source.Init() error = %v, wantErr %v", err, tt.wantErr) | ||
| return | ||
| } | ||
| chunksCh := make(chan *sources.Chunk) | ||
| var wg sync.WaitGroup | ||
| wg.Add(1) | ||
| chunksCh := make(chan *sources.Chunk, 1) | ||
| go func() { | ||
| defer wg.Done() | ||
| defer close(chunksCh) | ||
| err = s.Chunks(ctx, chunksCh) | ||
| if (err != nil) != tt.wantErr { | ||
| t.Errorf("Source.Chunks() error = %v, wantErr %v", err, tt.wantErr) | ||
| os.Exit(1) | ||
| } | ||
| }() | ||
| gotChunk := <-chunksCh | ||
| wantData, _ := base64.StdEncoding.DecodeString(tt.wantChunkData) | ||
|
|
||
| if diff := pretty.Compare(gotChunk.Data, wantData); diff != "" { | ||
| t.Errorf("%s: Source.Chunks() diff: (-got +want)\n%s", tt.name, diff) | ||
| waitFn := func() { | ||
| receivedFirstChunk := false | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| t.Errorf("TestSource_Chunks timed out: %v", ctx.Err()) | ||
| case gotChunk, ok := <-chunksCh: | ||
| if !ok { | ||
| t.Logf("Source.Chunks() finished, channel closed") | ||
| assert.Equal(t, "", s.GetProgress().EncodedResumeInfo) | ||
| assert.Equal(t, int64(100), s.GetProgress().PercentComplete) | ||
| return | ||
| } | ||
| if receivedFirstChunk { | ||
| // wantChunkData is the first chunk data. After the first chunk has | ||
| // been received and matched below, we want to drain chunksCh | ||
| // so Source.Chunks() can finish completely. | ||
| continue | ||
| } | ||
|
|
||
| receivedFirstChunk = true | ||
| wantData, _ := base64.StdEncoding.DecodeString(tt.wantChunkData) | ||
|
|
||
| if diff := pretty.Compare(gotChunk.Data, wantData); diff != "" { | ||
| t.Logf("%s: Source.Chunks() diff: (-got +want)\n%s", tt.name, diff) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rosecodym Could you try running the test again with the latest commit (1ca8a95)?
Actually, a If we use a |
||
| wg.Wait() | ||
| assert.Equal(t, "", s.GetProgress().EncodedResumeInfo) | ||
| assert.Equal(t, int64(100), s.GetProgress().PercentComplete) | ||
| waitFn() | ||
| }) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/cc @rosecodym
Here is a more detailed explanation of why
TestSource_Chunksmay block indefinitely.The issue arises after the call to
s.Chunks(ctx, chunksCh). The relevant call stack is:(*Source).Chunkstrufflehog/pkg/sources/s3/s3.go
Lines 390 to 398 in 6d4ccfa
(*Source).scanBucketstrufflehog/pkg/sources/s3/s3.go
Line 363 in 6d4ccfa
(*Source).pageChunkertrufflehog/pkg/sources/s3/s3.go
Lines 579 to 583 in 6d4ccfa
HandleFiletrufflehog/pkg/handlers/handlers.go
Line 390 in 6d4ccfa
handleChunksWithErrortrufflehog/pkg/handlers/handlers.go
Lines 426 to 428 in 6d4ccfa
(ChanReporter).ChunkOktrufflehog/pkg/sources/legacy_reporters.go
Lines 15 to 17 in 6d4ccfa
And blocks infinitely in
CancellableWrite. BecausechunksChis an unbuffered channel, thech <- itemcase can never proceed since we only receive fromchunksChonce.trufflehog/pkg/common/context.go
Lines 23 to 35 in 6d4ccfa
After changing
chunksChto a buffered channel, I created a public S3 bucket and can confirm that the test no longer hangs infinitely:TestSource_Chunk_buffered_channel.mp4