diff --git a/internal/api/bulking/bulker.go b/internal/api/bulking/bulker.go index beb5a6b78..f5d703ba9 100644 --- a/internal/api/bulking/bulker.go +++ b/internal/api/bulking/bulker.go @@ -16,6 +16,8 @@ import ( "sync/atomic" ) +var ErrAtomicParallelConflict = errors.New("atomic and parallel options are mutually exclusive") + type Bulker struct { ctrl ledgercontroller.Controller parallelism int @@ -93,7 +95,7 @@ func (b *Bulker) Run(ctx context.Context, bulk Bulk, result chan BulkElementResu defer span.End() if err := bulkOptions.Validate(); err != nil { - return fmt.Errorf("validating bulk options: %s", err) + return fmt.Errorf("validating bulk options: %w", err) } ctrl := b.ctrl @@ -287,7 +289,7 @@ type BulkingOptions struct { func (opts BulkingOptions) Validate() error { if opts.Atomic && opts.Parallel { - return errors.New("atomic and parallel options are mutually exclusive") + return ErrAtomicParallelConflict } return nil diff --git a/internal/api/v2/controllers_bulk.go b/internal/api/v2/controllers_bulk.go index 2d26009e3..9dccece24 100644 --- a/internal/api/v2/controllers_bulk.go +++ b/internal/api/v2/controllers_bulk.go @@ -43,7 +43,12 @@ func bulkHandler(bulkerFactory bulking.BulkerFactory, bulkHandlerFactories map[s }, ) if err != nil { - common.InternalServerError(w, r, err) + switch { + case errors.Is(err, bulking.ErrAtomicParallelConflict): + api.WriteErrorResponse(w, http.StatusPreconditionFailed, common.ErrValidation, err) + default: + common.InternalServerError(w, r, err) + } return } diff --git a/internal/api/v2/controllers_bulk_test.go b/internal/api/v2/controllers_bulk_test.go index 172fec7cb..7108bb65b 100644 --- a/internal/api/v2/controllers_bulk_test.go +++ b/internal/api/v2/controllers_bulk_test.go @@ -2,6 +2,7 @@ package v2 import ( "bytes" + "encoding/json" "fmt" "github.com/formancehq/go-libs/v3/pointer" "github.com/formancehq/ledger/internal/api/bulking" @@ -32,13 +33,13 @@ func TestBulk(t *testing.T) { now := time.Now() type bulkTestCase struct { - name string - queryParams url.Values - body string - expectations func(mockLedger *LedgerController) - expectError bool - expectResults []bulking.APIResult - headers http.Header + name string + queryParams url.Values + body string + expectations func(mockLedger *LedgerController) + expectStatusCode int + expectResults []bulking.APIResult + headers http.Header } testCases := []bulkTestCase{ @@ -295,7 +296,7 @@ func TestBulk(t *testing.T) { ErrorDescription: "context canceled", ResponseType: "ERROR", }}, - expectError: true, + expectStatusCode: http.StatusBadRequest, }, { name: "error in the middle with continue on failure", @@ -379,7 +380,7 @@ func TestBulk(t *testing.T) { }, { ResponseType: bulking.ActionAddMetadata, }}, - expectError: true, + expectStatusCode: http.StatusBadRequest, }, { name: "with atomic", @@ -449,6 +450,37 @@ func TestBulk(t *testing.T) { ResponseType: bulking.ActionAddMetadata, }}, }, + { + name: "with atomic and parallel", + body: `[ + { + "action": "ADD_METADATA", + "data": { + "targetId": "world", + "targetType": "ACCOUNT", + "metadata": { + "foo": "bar" + } + } + }, + { + "action": "ADD_METADATA", + "data": { + "targetId": "world", + "targetType": "ACCOUNT", + "metadata": { + "foo2": "bar2" + } + } + } + ]`, + queryParams: map[string][]string{ + "atomic": {"true"}, + "parallel": {"true"}, + }, + expectations: func(mockLedger *LedgerController) {}, + expectStatusCode: http.StatusPreconditionFailed, + }, { name: "with custom content type", headers: map[string][]string{ @@ -531,21 +563,27 @@ func TestBulk(t *testing.T) { router.ServeHTTP(rec, req) - if testCase.expectError { - require.Equal(t, http.StatusBadRequest, rec.Code) - } else { - require.Equal(t, http.StatusOK, rec.Code) + expectedStatusCode := testCase.expectStatusCode + if expectedStatusCode == 0 { + expectedStatusCode = http.StatusOK } + require.Equal(t, expectedStatusCode, rec.Code) - ret, _ := api.DecodeSingleResponse[[]bulking.APIResult](t, rec.Body) - ret = collectionutils.Map(ret, func(from bulking.APIResult) bulking.APIResult { - switch data := from.Data.(type) { - case map[string]any: - delete(data, "insertedAt") - } - return from - }) - require.Equal(t, testCase.expectResults, ret) + if expectedStatusCode == http.StatusOK || expectedStatusCode == http.StatusBadRequest { + ret, _ := api.DecodeSingleResponse[[]bulking.APIResult](t, rec.Body) + ret = collectionutils.Map(ret, func(from bulking.APIResult) bulking.APIResult { + switch data := from.Data.(type) { + case map[string]any: + delete(data, "insertedAt") + } + return from + }) + require.Equal(t, testCase.expectResults, ret) + } else { + errResponse := api.ErrorResponse{} + err := json.NewDecoder(rec.Body).Decode(&errResponse) + require.NoError(t, err) + } }) } } diff --git a/test/e2e/api_bulk_test.go b/test/e2e/api_bulk_test.go index 9db63d5de..acb463389 100644 --- a/test/e2e/api_bulk_test.go +++ b/test/e2e/api_bulk_test.go @@ -181,6 +181,16 @@ var _ = Context("Ledger engine tests", func() { }) It("should be ok", shouldBeOk) }) + Context("with atomic and parallel", func() { + BeforeEach(func() { + atomic = true + parallel = true + }) + It("should return an error", func() { + Expect(err).To(HaveOccurred()) + Expect(err).To(HaveErrorCode(string(components.V2ErrorsEnumValidation))) + }) + }) Context("with exceeded batch size", func() { BeforeEach(func() { items = make([]components.V2BulkElement, 0)