Skip to content

fix(bulk): invalid status code (500) when using atomic + parallel #960

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions internal/api/bulking/bulker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"sync/atomic"
)

var ErrAtomicParallelConflict = errors.New("atomic and parallel options are mutually exclusive")

type Bulker struct {
ctrl ledgercontroller.Controller
parallelism int
Expand Down Expand Up @@ -93,7 +95,7 @@ func (b *Bulker) Run(ctx context.Context, bulk Bulk, result chan BulkElementResu
defer span.End()

if err := bulkOptions.Validate(); err != nil {
return fmt.Errorf("validating bulk options: %s", err)
return fmt.Errorf("validating bulk options: %w", err)
}

ctrl := b.ctrl
Expand Down Expand Up @@ -287,7 +289,7 @@ type BulkingOptions struct {

func (opts BulkingOptions) Validate() error {
if opts.Atomic && opts.Parallel {
return errors.New("atomic and parallel options are mutually exclusive")
return ErrAtomicParallelConflict
}

return nil
Expand Down
7 changes: 6 additions & 1 deletion internal/api/v2/controllers_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@
},
)
if err != nil {
common.InternalServerError(w, r, err)
switch {
case errors.Is(err, bulking.ErrAtomicParallelConflict):
api.WriteErrorResponse(w, http.StatusPreconditionFailed, common.ErrValidation, err)
default:
common.InternalServerError(w, r, err)

Check warning on line 50 in internal/api/v2/controllers_bulk.go

View check run for this annotation

Codecov / codecov/patch

internal/api/v2/controllers_bulk.go#L49-L50

Added lines #L49 - L50 were not covered by tests
}
return
}

Expand Down
82 changes: 60 additions & 22 deletions internal/api/v2/controllers_bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v2

import (
"bytes"
"encoding/json"
"fmt"
"github.com/formancehq/go-libs/v3/pointer"
"github.com/formancehq/ledger/internal/api/bulking"
Expand Down Expand Up @@ -32,13 +33,13 @@ func TestBulk(t *testing.T) {
now := time.Now()

type bulkTestCase struct {
name string
queryParams url.Values
body string
expectations func(mockLedger *LedgerController)
expectError bool
expectResults []bulking.APIResult
headers http.Header
name string
queryParams url.Values
body string
expectations func(mockLedger *LedgerController)
expectStatusCode int
expectResults []bulking.APIResult
headers http.Header
}
Comment on lines +36 to 43
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Defaulting to 200 hides forgotten expectations

Silently replacing zero with 200 can mask missing assertions. Require every test to set expectStatusCode explicitly.

-		expectStatusCode int
+		expectStatusCode int // must be set in every test case; defaults removed

And drop the fallback below.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
name string
queryParams url.Values
body string
expectations func(mockLedger *LedgerController)
expectStatusCode int
expectResults []bulking.APIResult
headers http.Header
}
name string
queryParams url.Values
body string
expectations func(mockLedger *LedgerController)
expectStatusCode int // must be set in every test case; defaults removed
expectResults []bulking.APIResult
headers http.Header
}
🤖 Prompt for AI Agents
In internal/api/v2/controllers_bulk_test.go around lines 36 to 43, the test
struct field expectStatusCode currently defaults to 200 when not set, which can
hide missing assertions. Remove any defaulting logic that sets expectStatusCode
to 200 automatically and require each test case to explicitly set
expectStatusCode to ensure all tests have clear expected status codes.


testCases := []bulkTestCase{
Expand Down Expand Up @@ -295,7 +296,7 @@ func TestBulk(t *testing.T) {
ErrorDescription: "context canceled",
ResponseType: "ERROR",
}},
expectError: true,
expectStatusCode: http.StatusBadRequest,
},
{
name: "error in the middle with continue on failure",
Expand Down Expand Up @@ -379,7 +380,7 @@ func TestBulk(t *testing.T) {
}, {
ResponseType: bulking.ActionAddMetadata,
}},
expectError: true,
expectStatusCode: http.StatusBadRequest,
},
{
name: "with atomic",
Expand Down Expand Up @@ -449,6 +450,37 @@ func TestBulk(t *testing.T) {
ResponseType: bulking.ActionAddMetadata,
}},
},
{
name: "with atomic and parallel",
body: `[
{
"action": "ADD_METADATA",
"data": {
"targetId": "world",
"targetType": "ACCOUNT",
"metadata": {
"foo": "bar"
}
}
},
{
"action": "ADD_METADATA",
"data": {
"targetId": "world",
"targetType": "ACCOUNT",
"metadata": {
"foo2": "bar2"
}
}
}
]`,
queryParams: map[string][]string{
"atomic": {"true"},
"parallel": {"true"},
},
expectations: func(mockLedger *LedgerController) {},
expectStatusCode: http.StatusPreconditionFailed,
},
{
name: "with custom content type",
headers: map[string][]string{
Expand Down Expand Up @@ -531,21 +563,27 @@ func TestBulk(t *testing.T) {

router.ServeHTTP(rec, req)

if testCase.expectError {
require.Equal(t, http.StatusBadRequest, rec.Code)
} else {
require.Equal(t, http.StatusOK, rec.Code)
expectedStatusCode := testCase.expectStatusCode
if expectedStatusCode == 0 {
expectedStatusCode = http.StatusOK
}
require.Equal(t, expectedStatusCode, rec.Code)

ret, _ := api.DecodeSingleResponse[[]bulking.APIResult](t, rec.Body)
ret = collectionutils.Map(ret, func(from bulking.APIResult) bulking.APIResult {
switch data := from.Data.(type) {
case map[string]any:
delete(data, "insertedAt")
}
return from
})
require.Equal(t, testCase.expectResults, ret)
if expectedStatusCode == http.StatusOK || expectedStatusCode == http.StatusBadRequest {
ret, _ := api.DecodeSingleResponse[[]bulking.APIResult](t, rec.Body)
ret = collectionutils.Map(ret, func(from bulking.APIResult) bulking.APIResult {
switch data := from.Data.(type) {
case map[string]any:
delete(data, "insertedAt")
}
return from
})
require.Equal(t, testCase.expectResults, ret)
} else {
errResponse := api.ErrorResponse{}
err := json.NewDecoder(rec.Body).Decode(&errResponse)
require.NoError(t, err)
}
})
}
}
10 changes: 10 additions & 0 deletions test/e2e/api_bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,16 @@ var _ = Context("Ledger engine tests", func() {
})
It("should be ok", shouldBeOk)
})
Context("with atomic and parallel", func() {
BeforeEach(func() {
atomic = true
parallel = true
})
It("should return an error", func() {
Expect(err).To(HaveOccurred())
Expect(err).To(HaveErrorCode(string(components.V2ErrorsEnumValidation)))
})
})
Context("with exceeded batch size", func() {
BeforeEach(func() {
items = make([]components.V2BulkElement, 0)
Expand Down
Loading