Skip to content

Conversation

@Juneezee
Copy link
Contributor

@Juneezee Juneezee commented Apr 14, 2025

Description:

This PR addresses two issues:

  1. Duplicate test functions in s3_integration_test.go.

    The file currently contains two separate TestSourceChunksNoResumption. The only differences between them are the use of t.Parallel() in one and the bucket name. This PR merges both into a single, table-driven test

    2025-04-15_00-18

    func TestSourceChunksNoResumption(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
    defer cancel()
    s := Source{}
    connection := &sourcespb.S3{
    Credential: &sourcespb.S3_Unauthenticated{},
    Buckets: []string{"trufflesec-ahrav-test-2"},
    }
    conn, err := anypb.New(connection)
    if err != nil {
    t.Fatal(err)
    }
    err = s.Init(ctx, "test name", 0, 0, false, conn, 1)
    chunksCh := make(chan *sources.Chunk)
    go func() {
    defer close(chunksCh)
    err = s.Chunks(ctx, chunksCh)
    assert.Nil(t, err)
    }()
    wantChunkCount := 19787
    got := 0
    for range chunksCh {
    got++
    }
    assert.Equal(t, got, wantChunkCount)
    }

    func TestSourceChunksNoResumption(t *testing.T) {
    t.Parallel()
    ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
    defer cancel()
    s := Source{}
    connection := &sourcespb.S3{
    Credential: &sourcespb.S3_Unauthenticated{},
    Buckets: []string{"integration-resumption-tests"},
    }
    conn, err := anypb.New(connection)
    if err != nil {
    t.Fatal(err)
    }
    err = s.Init(ctx, "test name", 0, 0, false, conn, 1)
    chunksCh := make(chan *sources.Chunk)
    go func() {
    defer close(chunksCh)
    err = s.Chunks(ctx, chunksCh)
    assert.Nil(t, err)
    }()
    wantChunkCount := 19787
    got := 0
    for range chunksCh {
    got++
    }
    assert.Equal(t, wantChunkCount, got)
    }

  2. Possible infinite block in TestSource_Chunks

    As reported in feat(sources/s3): migrate to AWS SDK v2 #4069 (comment), the test TestSource_Chunks in s3_test.go may block indefinitely. This is due to the use of an unbuffered channel (chunksCh), combined with only a single receive operation (gotChunk := <-chunksCh). If the test bucket contains more than one chunk, the s.Chunks(ctx, chunksCh) call will block, causing the test to hang.

    See test(sources/s3): fix infinite blocking and timeout issue in TestSource_Chunks #4048 (comment) for full explanation.

    chunksCh := make(chan *sources.Chunk)
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
    defer wg.Done()
    err = s.Chunks(ctx, chunksCh)
    if (err != nil) != tt.wantErr {
    t.Errorf("Source.Chunks() error = %v, wantErr %v", err, tt.wantErr)
    os.Exit(1)
    }
    }()
    gotChunk := <-chunksCh

Checklist:

  • Tests passing (make test-community)?
  • Lint passing (make lint this requires golangci-lint)?

Signed-off-by: Eng Zer Jun <engzerjun@gmail.com>
@Juneezee Juneezee requested review from a team as code owners April 14, 2025 16:35
@zricethezav
Copy link
Contributor

@ahrav any idea which one is the test we want to keep?

Signed-off-by: Eng Zer Jun <engzerjun@gmail.com>
Signed-off-by: Eng Zer Jun <engzerjun@gmail.com>
@Juneezee Juneezee changed the title test(sources/s3): remove duplicate integration test case test(sources/s3): fix infinite blocking and timeout issue in TestSource_Chunks Apr 22, 2025
Comment on lines +118 to 121
chunksCh := make(chan *sources.Chunk, 1)
go func() {
defer wg.Done()
defer close(chunksCh)
err = s.Chunks(ctx, chunksCh)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/cc @rosecodym

Here is a more detailed explanation of why TestSource_Chunks may block indefinitely.

The issue arises after the call to s.Chunks(ctx, chunksCh). The relevant call stack is:

  1. (*Source).Chunks

    // Chunks emits chunks of bytes over a channel.
    func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk, _ ...sources.ChunkingTarget) error {
    visitor := func(c context.Context, defaultRegionClient *s3.S3, roleArn string, buckets []string) error {
    s.scanBuckets(c, defaultRegionClient, roleArn, buckets, chunksChan)
    return nil
    }
    return s.visitRoles(ctx, visitor)
    }

  2. (*Source).scanBuckets

    s.pageChunker(ctx, pageMetadata, processingState, chunksChan)

  3. (*Source).pageChunker

    if err := handlers.HandleFile(ctx, res.Body, chunkSkel, sources.ChanReporter{Ch: chunksChan}); err != nil {
    ctx.Logger().Error(err, "error handling file")
    s.metricsCollector.RecordObjectError(metadata.bucket)
    return nil
    }

  4. HandleFile

    return handleChunksWithError(processingCtx, dataOrErrChan, chunkSkel, reporter)

  5. handleChunksWithError

    if err := reporter.ChunkOk(ctx, chunk); err != nil {
    return fmt.Errorf("error reporting chunk: %w", err)
    }

  6. (ChanReporter).ChunkOk

    func (c ChanReporter) ChunkOk(ctx context.Context, chunk Chunk) error {
    return common.CancellableWrite(ctx, c.Ch, &chunk)
    }

  7. And blocks infinitely in CancellableWrite. Because chunksCh is an unbuffered channel, the ch <- item case can never proceed since we only receive from chunksCh once.

    func CancellableWrite[T any](ctx context.Context, ch chan<- T, item T) error {
    select {
    case <-ctx.Done(): // priority to context cancellation
    return ctx.Err()
    default:
    select {
    case <-ctx.Done():
    return ctx.Err()
    case ch <- item:
    return nil
    }
    }
    }

After changing chunksCh to a buffered channel, I created a public S3 bucket and can confirm that the test no longer hangs infinitely:

TestSource_Chunk_buffered_channel.mp4

rosecodym
rosecodym previously approved these changes Apr 23, 2025
Copy link
Contributor

@rosecodym rosecodym left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Juneezee thank you for your detailed investigation!

@rosecodym rosecodym dismissed their stale review April 23, 2025 13:34

actually, i'm seeing some new test failures :/

Copy link
Contributor

@rosecodym rosecodym left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this! Unfortunately, it introduces a new test failure:

=== RUN   TestSource_Chunks/gets_chunks_after_assuming_role
2025-04-23T09:40:41-04:00	info-0	context	Creating checkpointer	{"timeout": 30}
2025-04-23T09:40:42-04:00	info-0	context	Starting new scan from beginning	{"timeout": 30, "role": "arn:aws:iam::619888638459:role/s3-test-assume-role"}
    s3_test.go:138: 
        	Error Trace:	/Users/cody.rose/src/trufflehog/pkg/sources/s3/s3_test.go:138
        	Error:      	Not equal: 
        	            	expected: ""
        	            	actual  : "{\"current_bucket\":\"truffletestbucket-s3-role-assumption\",\"start_after\":\"s\"}"
        	            	
        	            	Diff:
        	            	--- Expected
        	            	+++ Actual
        	            	@@ -1 +1 @@
        	            	-
        	            	+{"current_bucket":"truffletestbucket-s3-role-assumption","start_after":"s"}
        	Test:       	TestSource_Chunks/gets_chunks_after_assuming_role
    s3_test.go:139: 
        	Error Trace:	/Users/cody.rose/src/trufflehog/pkg/sources/s3/s3_test.go:139
        	Error:      	Not equal: 
        	            	expected: 100
        	            	actual  : 90
        	Test:       	TestSource_Chunks/gets_chunks_after_assuming_role
--- FAIL: TestSource_Chunks (6.15s)
    --- FAIL: TestSource_Chunks/gets_chunks_after_assuming_role (5.66s)

The problem is that the test no longer waits for Chunks to return before checking the source's resumption info and progress completion, so there's a mismatch. I believe this can be resolved by re-adding the removed waitgroup functionality.

Reference: #4048 (review)
Signed-off-by: Eng Zer Jun <engzerjun@gmail.com>
Comment on lines 128 to 156
waitFn := func() {
receivedFirstChunk := false
for {
select {
case <-ctx.Done():
t.Errorf("TestSource_Chunks timed out: %v", ctx.Err())
case gotChunk, ok := <-chunksCh:
if !ok {
t.Logf("Source.Chunks() finished, channel closed")
assert.Equal(t, "", s.GetProgress().EncodedResumeInfo)
assert.Equal(t, int64(100), s.GetProgress().PercentComplete)
return
}
if receivedFirstChunk {
// wantChunkData is the first chunk data. After the first chunk has
// been received and matched below, we want to drain chunksCh
// so Source.Chunks() can finish completely.
continue
}

receivedFirstChunk = true
wantData, _ := base64.StdEncoding.DecodeString(tt.wantChunkData)

if diff := pretty.Compare(gotChunk.Data, wantData); diff != "" {
t.Logf("%s: Source.Chunks() diff: (-got +want)\n%s", tt.name, diff)
}
}
}
}
Copy link
Contributor Author

@Juneezee Juneezee Apr 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rosecodym Could you try running the test again with the latest commit (1ca8a95)?

I believe this can be resolved by re-adding the removed waitgroup functionality.

Actually, a WaitGroup alone is not sufficient here. For s.Chunks(ctx, chunksCh) to finish completely, the chunksCh channel must be fully drained.

If we use a WaitGroup without draining chunksCh, the test will still block indefinitely, unless chunksCh is a buffered channel with a large enough buffer to hold all the chunks.

Signed-off-by: Eng Zer Jun <engzerjun@gmail.com>
@Juneezee Juneezee requested a review from rosecodym April 23, 2025 18:08
@rosecodym
Copy link
Contributor

I see passing tests now!

Copy link
Contributor

@rosecodym rosecodym left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for cleaning this up!

@rosecodym rosecodym merged commit a4838d4 into trufflesecurity:main Apr 24, 2025
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants