-
Notifications
You must be signed in to change notification settings - Fork 2.1k
test(sources/s3): fix infinite blocking and timeout issue in TestSource_Chunks #4048
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
test(sources/s3): fix infinite blocking and timeout issue in TestSource_Chunks #4048
Conversation
Signed-off-by: Eng Zer Jun <engzerjun@gmail.com>
|
@ahrav any idea which one is the test we want to keep? |
Signed-off-by: Eng Zer Jun <engzerjun@gmail.com>
Signed-off-by: Eng Zer Jun <engzerjun@gmail.com>
| chunksCh := make(chan *sources.Chunk, 1) | ||
| go func() { | ||
| defer wg.Done() | ||
| defer close(chunksCh) | ||
| err = s.Chunks(ctx, chunksCh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/cc @rosecodym
Here is a more detailed explanation of why TestSource_Chunks may block indefinitely.
The issue arises after the call to s.Chunks(ctx, chunksCh). The relevant call stack is:
-
(*Source).Chunkstrufflehog/pkg/sources/s3/s3.go
Lines 390 to 398 in 6d4ccfa
// Chunks emits chunks of bytes over a channel. func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk, _ ...sources.ChunkingTarget) error { visitor := func(c context.Context, defaultRegionClient *s3.S3, roleArn string, buckets []string) error { s.scanBuckets(c, defaultRegionClient, roleArn, buckets, chunksChan) return nil } return s.visitRoles(ctx, visitor) } -
(*Source).scanBucketstrufflehog/pkg/sources/s3/s3.go
Line 363 in 6d4ccfa
s.pageChunker(ctx, pageMetadata, processingState, chunksChan) -
(*Source).pageChunkertrufflehog/pkg/sources/s3/s3.go
Lines 579 to 583 in 6d4ccfa
if err := handlers.HandleFile(ctx, res.Body, chunkSkel, sources.ChanReporter{Ch: chunksChan}); err != nil { ctx.Logger().Error(err, "error handling file") s.metricsCollector.RecordObjectError(metadata.bucket) return nil } -
HandleFiletrufflehog/pkg/handlers/handlers.go
Line 390 in 6d4ccfa
return handleChunksWithError(processingCtx, dataOrErrChan, chunkSkel, reporter) -
handleChunksWithErrortrufflehog/pkg/handlers/handlers.go
Lines 426 to 428 in 6d4ccfa
if err := reporter.ChunkOk(ctx, chunk); err != nil { return fmt.Errorf("error reporting chunk: %w", err) } -
(ChanReporter).ChunkOktrufflehog/pkg/sources/legacy_reporters.go
Lines 15 to 17 in 6d4ccfa
func (c ChanReporter) ChunkOk(ctx context.Context, chunk Chunk) error { return common.CancellableWrite(ctx, c.Ch, &chunk) } -
And blocks infinitely in
CancellableWrite. BecausechunksChis an unbuffered channel, thech <- itemcase can never proceed since we only receive fromchunksChonce.trufflehog/pkg/common/context.go
Lines 23 to 35 in 6d4ccfa
func CancellableWrite[T any](ctx context.Context, ch chan<- T, item T) error { select { case <-ctx.Done(): // priority to context cancellation return ctx.Err() default: select { case <-ctx.Done(): return ctx.Err() case ch <- item: return nil } } }
After changing chunksCh to a buffered channel, I created a public S3 bucket and can confirm that the test no longer hangs infinitely:
TestSource_Chunk_buffered_channel.mp4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Juneezee thank you for your detailed investigation!
actually, i'm seeing some new test failures :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this! Unfortunately, it introduces a new test failure:
=== RUN TestSource_Chunks/gets_chunks_after_assuming_role
2025-04-23T09:40:41-04:00 info-0 context Creating checkpointer {"timeout": 30}
2025-04-23T09:40:42-04:00 info-0 context Starting new scan from beginning {"timeout": 30, "role": "arn:aws:iam::619888638459:role/s3-test-assume-role"}
s3_test.go:138:
Error Trace: /Users/cody.rose/src/trufflehog/pkg/sources/s3/s3_test.go:138
Error: Not equal:
expected: ""
actual : "{\"current_bucket\":\"truffletestbucket-s3-role-assumption\",\"start_after\":\"s\"}"
Diff:
--- Expected
+++ Actual
@@ -1 +1 @@
-
+{"current_bucket":"truffletestbucket-s3-role-assumption","start_after":"s"}
Test: TestSource_Chunks/gets_chunks_after_assuming_role
s3_test.go:139:
Error Trace: /Users/cody.rose/src/trufflehog/pkg/sources/s3/s3_test.go:139
Error: Not equal:
expected: 100
actual : 90
Test: TestSource_Chunks/gets_chunks_after_assuming_role
--- FAIL: TestSource_Chunks (6.15s)
--- FAIL: TestSource_Chunks/gets_chunks_after_assuming_role (5.66s)
The problem is that the test no longer waits for Chunks to return before checking the source's resumption info and progress completion, so there's a mismatch. I believe this can be resolved by re-adding the removed waitgroup functionality.
Reference: #4048 (review) Signed-off-by: Eng Zer Jun <engzerjun@gmail.com>
| waitFn := func() { | ||
| receivedFirstChunk := false | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| t.Errorf("TestSource_Chunks timed out: %v", ctx.Err()) | ||
| case gotChunk, ok := <-chunksCh: | ||
| if !ok { | ||
| t.Logf("Source.Chunks() finished, channel closed") | ||
| assert.Equal(t, "", s.GetProgress().EncodedResumeInfo) | ||
| assert.Equal(t, int64(100), s.GetProgress().PercentComplete) | ||
| return | ||
| } | ||
| if receivedFirstChunk { | ||
| // wantChunkData is the first chunk data. After the first chunk has | ||
| // been received and matched below, we want to drain chunksCh | ||
| // so Source.Chunks() can finish completely. | ||
| continue | ||
| } | ||
|
|
||
| receivedFirstChunk = true | ||
| wantData, _ := base64.StdEncoding.DecodeString(tt.wantChunkData) | ||
|
|
||
| if diff := pretty.Compare(gotChunk.Data, wantData); diff != "" { | ||
| t.Logf("%s: Source.Chunks() diff: (-got +want)\n%s", tt.name, diff) | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rosecodym Could you try running the test again with the latest commit (1ca8a95)?
I believe this can be resolved by re-adding the removed waitgroup functionality.
Actually, a WaitGroup alone is not sufficient here. For s.Chunks(ctx, chunksCh) to finish completely, the chunksCh channel must be fully drained.
If we use a WaitGroup without draining chunksCh, the test will still block indefinitely, unless chunksCh is a buffered channel with a large enough buffer to hold all the chunks.
Signed-off-by: Eng Zer Jun <engzerjun@gmail.com>
|
I see passing tests now! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for cleaning this up!
Description:
This PR addresses two issues:
Duplicate test functions in
s3_integration_test.go.The file currently contains two separate
TestSourceChunksNoResumption. The only differences between them are the use oft.Parallel()in one and the bucket name. This PR merges both into a single, table-driven testtrufflehog/pkg/sources/s3/s3_integration_test.go
Lines 85 to 114 in a1243a4
trufflehog/pkg/sources/s3/s3_integration_test.go
Lines 251 to 282 in a1243a4
Possible infinite block in
TestSource_ChunksAs reported in feat(sources/s3): migrate to AWS SDK v2 #4069 (comment), the test
TestSource_Chunksins3_test.gomay block indefinitely. This is due to the use of an unbuffered channel (chunksCh), combined with only a single receive operation (gotChunk := <-chunksCh). If the test bucket contains more than one chunk, thes.Chunks(ctx, chunksCh)call will block, causing the test to hang.See test(sources/s3): fix infinite blocking and timeout issue in TestSource_Chunks #4048 (comment) for full explanation.
trufflehog/pkg/sources/s3/s3_test.go
Lines 120 to 131 in a1243a4
Checklist:
make test-community)?make lintthis requires golangci-lint)?