diff --git a/internal/api/bulking/bulker_test.go b/internal/api/bulking/bulker_test.go index 58b1f3c65c..1af89ed5e6 100644 --- a/internal/api/bulking/bulker_test.go +++ b/internal/api/bulking/bulker_test.go @@ -3,6 +3,7 @@ package bulking import ( "encoding/json" "github.com/formancehq/go-libs/v2/logging" + "github.com/formancehq/go-libs/v2/pointer" "github.com/uptrace/bun" "math/big" "testing" @@ -61,7 +62,9 @@ func TestBulk(t *testing.T) { Timestamp: now, }, false), }). - Return(&ledger.Log{}, &ledger.CreatedTransaction{ + Return(&ledger.Log{ + ID: pointer.For(1), + }, &ledger.CreatedTransaction{ Transaction: ledger.Transaction{ TransactionData: ledger.TransactionData{ Postings: postings, @@ -105,7 +108,9 @@ func TestBulk(t *testing.T) { }, }, }). - Return(&ledger.Log{}, nil) + Return(&ledger.Log{ + ID: pointer.For(1), + }, nil) }, expectResults: []BulkElementResult{{}}, }, @@ -131,7 +136,9 @@ func TestBulk(t *testing.T) { }, }, }). - Return(&ledger.Log{}, nil) + Return(&ledger.Log{ + ID: pointer.For(1), + }, nil) }, expectResults: []BulkElementResult{{}}, }, @@ -150,7 +157,9 @@ func TestBulk(t *testing.T) { TransactionID: 1, }, }). - Return(&ledger.Log{}, &ledger.RevertedTransaction{}, nil) + Return(&ledger.Log{ + ID: pointer.For(1), + }, &ledger.RevertedTransaction{}, nil) }, expectResults: []BulkElementResult{{ Data: ledger.Transaction{}, @@ -174,7 +183,9 @@ func TestBulk(t *testing.T) { Key: "foo", }, }). - Return(&ledger.Log{}, nil) + Return(&ledger.Log{ + ID: pointer.For(1), + }, nil) }, expectResults: []BulkElementResult{{}}, }, @@ -196,7 +207,9 @@ func TestBulk(t *testing.T) { Key: "foo", }, }). - Return(&ledger.Log{}, nil) + Return(&ledger.Log{ + ID: pointer.For(1), + }, nil) }, expectResults: []BulkElementResult{{}}, }, @@ -240,7 +253,9 @@ func TestBulk(t *testing.T) { }, }, }). - Return(&ledger.Log{}, nil) + Return(&ledger.Log{ + ID: pointer.For(1), + }, nil) mockLedger.EXPECT(). SaveAccountMetadata(gomock.Any(), ledgercontroller.Parameters[ledgercontroller.SaveAccountMetadata]{ Input: ledgercontroller.SaveAccountMetadata{ @@ -300,7 +315,9 @@ func TestBulk(t *testing.T) { }, }, }). - Return(&ledger.Log{}, nil) + Return(&ledger.Log{ + ID: pointer.For(1), + }, nil) mockLedger.EXPECT(). SaveAccountMetadata(gomock.Any(), ledgercontroller.Parameters[ledgercontroller.SaveAccountMetadata]{ Input: ledgercontroller.SaveAccountMetadata{ @@ -320,7 +337,9 @@ func TestBulk(t *testing.T) { }, }, }). - Return(&ledger.Log{}, nil) + Return(&ledger.Log{ + ID: pointer.For(1), + }, nil) }, expectResults: []BulkElementResult{{}, { Error: errors.New("unexpected error"), @@ -365,7 +384,9 @@ func TestBulk(t *testing.T) { }, }, }). - Return(&ledger.Log{}, nil) + Return(&ledger.Log{ + ID: pointer.For(1), + }, nil) mockLedger.EXPECT(). SaveAccountMetadata(gomock.Any(), ledgercontroller.Parameters[ledgercontroller.SaveAccountMetadata]{ @@ -376,7 +397,9 @@ func TestBulk(t *testing.T) { }, }, }). - Return(&ledger.Log{}, nil) + Return(&ledger.Log{ + ID: pointer.For(1), + }, nil) mockLedger.EXPECT(). Commit(gomock.Any()). diff --git a/internal/api/bulking/handler_stream_json.go b/internal/api/bulking/handler_stream_json.go new file mode 100644 index 0000000000..2caa534e44 --- /dev/null +++ b/internal/api/bulking/handler_stream_json.go @@ -0,0 +1,86 @@ +package bulking + +import ( + "encoding/json" + "net/http" +) + +type JSONStreamBulkHandler struct { + channel Bulk + terminated chan struct{} + receive chan BulkElementResult + results []BulkElementResult + actions []string + err error +} + +func (h *JSONStreamBulkHandler) GetChannels(_ http.ResponseWriter, r *http.Request) (Bulk, chan BulkElementResult, bool) { + + h.channel = make(Bulk) + h.receive = make(chan BulkElementResult) + h.terminated = make(chan struct{}) + + go func() { + defer close(h.channel) + + dec := json.NewDecoder(r.Body) + + for { + select { + case <-r.Context().Done(): + return + default: + nextElement := &BulkElement{} + err := dec.Decode(nextElement) + if err != nil { + h.err = err + return + } + + h.actions = append(h.actions, nextElement.GetAction()) + h.channel <- *nextElement + } + } + }() + go func() { + defer close(h.terminated) + + for { + select { + case <-r.Context().Done(): + return + case res, ok := <-h.receive: + if !ok { + return + } + h.results = append(h.results, res) + } + } + }() + + return h.channel, h.receive, true +} + +func (h *JSONStreamBulkHandler) Terminate(w http.ResponseWriter, r *http.Request) { + select { + case <-h.terminated: + writeJSONResponse(w, h.actions, h.results, h.err) + case <-r.Context().Done(): + } +} + +func NewJSONStreamBulkHandler() *JSONStreamBulkHandler { + return &JSONStreamBulkHandler{} +} + +type JSONStreamBulkHandlerFactory struct{} + +func (j JSONStreamBulkHandlerFactory) CreateBulkHandler() Handler { + return NewJSONStreamBulkHandler() +} + +func NewJSONStreamBulkHandlerFactory() HandlerFactory { + return &JSONStreamBulkHandlerFactory{} +} + +var _ HandlerFactory = (*JSONStreamBulkHandlerFactory)(nil) diff --git a/internal/api/bulking/handler_stream_json_test.go b/internal/api/bulking/handler_stream_json_test.go new file mode 100644 index 0000000000..0c7856c32a --- /dev/null +++ b/internal/api/bulking/handler_stream_json_test.go @@ -0,0 +1,93 @@ +package bulking + +import ( + "github.com/formancehq/go-libs/v2/api" + ledger "github.com/formancehq/ledger/internal" + "github.com/stretchr/testify/require" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestBulkStreamHandlerJSON(t *testing.T) { + + t.Parallel() + + type testCase struct { + name string + stream string + expectedError bool + expectedStatusCode int + + expectScriptCount int + } + + for _, testCase := range []testCase{ + { + name: "nominal", + stream: ` +{"action": "CREATE_TRANSACTION", "data": {"postings": [{"source": "world", "amount": 100, "asset": "USD", "destination": "bank"}]}} +{"action": "CREATE_TRANSACTION", "data": {"postings": [{"source": "world", "amount": 200, "asset": "USD", "destination": "bank"}]}} +`, + expectScriptCount: 2, + }, + } { + t.Run(testCase.name, func(t *testing.T) { + t.Parallel() + + reader, writer := io.Pipe() + + w := httptest.NewRecorder() + r := httptest.NewRequest(http.MethodPost, "/", reader) + + h := NewJSONStreamBulkHandler() + send, receive, ok := h.GetChannels(w, r) + + if testCase.expectedError { + require.False(t, ok) + require.Equal(t, testCase.expectedStatusCode, w.Result().StatusCode) + return + } + + require.True(t, ok) + + _, err := writer.Write([]byte(testCase.stream)) + require.NoError(t, err) + + for id := range testCase.expectScriptCount { + select { + case <-send: + case <-time.After(100 * time.Millisecond): + t.Fatal("should have received send channel") + } + select { + case receive <- BulkElementResult{ + Data: ledger.CreatedTransaction{}, + LogID: id + 1, + ElementID: id, + }: + case <-time.After(100 * time.Millisecond): + t.Fatal("should have been able to send on receive channel") + } + } + + require.NoError(t, writer.Close()) + select { + case <-send: + case <-time.After(100 * time.Millisecond): + t.Fatal("send channel should have been closed") + } + close(receive) + + h.Terminate(w, r) + + require.Equal(t, http.StatusOK, w.Result().StatusCode) + + response, ok := api.DecodeSingleResponse[[]APIResult](t, w.Result().Body) + require.True(t, ok) + require.Len(t, response, testCase.expectScriptCount) + }) + } +} diff --git a/internal/api/bulking/handler_text.go b/internal/api/bulking/handler_stream_text.go similarity index 63% rename from internal/api/bulking/handler_text.go rename to internal/api/bulking/handler_stream_text.go index a0a0d9e4c5..64a0797d51 100644 --- a/internal/api/bulking/handler_text.go +++ b/internal/api/bulking/handler_stream_text.go @@ -5,7 +5,7 @@ import ( "net/http" ) -type ScriptStreamBulkHandler struct { +type TextStreamBulkHandler struct { channel Bulk terminated chan struct{} receive chan BulkElementResult @@ -14,7 +14,7 @@ type ScriptStreamBulkHandler struct { err error } -func (h *ScriptStreamBulkHandler) GetChannels(_ http.ResponseWriter, r *http.Request) (Bulk, chan BulkElementResult, bool) { +func (h *TextStreamBulkHandler) GetChannels(_ http.ResponseWriter, r *http.Request) (Bulk, chan BulkElementResult, bool) { h.channel = make(Bulk) h.receive = make(chan BulkElementResult) @@ -65,7 +65,7 @@ func (h *ScriptStreamBulkHandler) GetChannels(_ http.ResponseWriter, r *http.Req return h.channel, h.receive, true } -func (h *ScriptStreamBulkHandler) Terminate(w http.ResponseWriter, r *http.Request) { +func (h *TextStreamBulkHandler) Terminate(w http.ResponseWriter, r *http.Request) { select { case <-h.terminated: writeJSONResponse(w, h.actions, h.results, h.err) @@ -73,18 +73,18 @@ func (h *ScriptStreamBulkHandler) Terminate(w http.ResponseWriter, r *http.Reque } } -func NewScriptStreamBulkHandler() *ScriptStreamBulkHandler { - return &ScriptStreamBulkHandler{} +func NewTextStreamBulkHandler() *TextStreamBulkHandler { + return &TextStreamBulkHandler{} } -type scriptStreamBulkHandlerFactory struct{} +type textStreamBulkHandlerFactory struct{} -func (j scriptStreamBulkHandlerFactory) CreateBulkHandler() Handler { - return NewScriptStreamBulkHandler() +func (j textStreamBulkHandlerFactory) CreateBulkHandler() Handler { + return NewTextStreamBulkHandler() } -func NewScriptStreamBulkHandlerFactory() HandlerFactory { - return &scriptStreamBulkHandlerFactory{} +func NewTextStreamBulkHandlerFactory() HandlerFactory { + return &textStreamBulkHandlerFactory{} } -var _ HandlerFactory = (*scriptStreamBulkHandlerFactory)(nil) +var _ HandlerFactory = (*textStreamBulkHandlerFactory)(nil) diff --git a/internal/api/bulking/handler_text_test.go b/internal/api/bulking/handler_stream_text_test.go similarity index 92% rename from internal/api/bulking/handler_text_test.go rename to internal/api/bulking/handler_stream_text_test.go index af1d2562c5..f5e79b87f7 100644 --- a/internal/api/bulking/handler_text_test.go +++ b/internal/api/bulking/handler_stream_text_test.go @@ -52,7 +52,7 @@ send [USD 100] ( w := httptest.NewRecorder() r := httptest.NewRequest(http.MethodPost, "/", reader) - h := NewScriptStreamBulkHandler() + h := NewTextStreamBulkHandler() send, receive, ok := h.GetChannels(w, r) if testCase.expectedError { @@ -84,6 +84,11 @@ send [USD 100] ( } require.NoError(t, writer.Close()) + select { + case <-send: + case <-time.After(100 * time.Millisecond): + t.Fatal("send channel should have been closed") + } close(receive) h.Terminate(w, r) diff --git a/internal/api/router.go b/internal/api/router.go index 686cfa973a..cc8392dd43 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -84,10 +84,7 @@ func NewRouter( v2.WithTracer(routerOptions.tracer), v2.WithMiddlewares(commonMiddlewares...), v2.WithBulkerFactory(routerOptions.bulkerFactory), - v2.WithBulkHandlerFactories(map[string]bulking.HandlerFactory{ - "application/json": bulking.NewJSONBulkHandlerFactory(routerOptions.bulkMaxSize), - "application/vnd.formance.ledger.api.v2.bulk+script-stream": bulking.NewScriptStreamBulkHandlerFactory(), - }), + v2.WithDefaultBulkHandlerFactories(routerOptions.bulkMaxSize), v2.WithPaginationConfig(routerOptions.paginationConfig), ) mux.Handle("/v2*", http.StripPrefix("/v2", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/internal/api/v2/routes.go b/internal/api/v2/routes.go index 88ee759a28..3a182694d2 100644 --- a/internal/api/v2/routes.go +++ b/internal/api/v2/routes.go @@ -138,13 +138,18 @@ func WithPaginationConfig(paginationConfig common.PaginationConfig) RouterOption } } +func WithDefaultBulkHandlerFactories(bulkMaxSize int) RouterOption { + return WithBulkHandlerFactories(map[string]bulking.HandlerFactory{ + "application/json": bulking.NewJSONBulkHandlerFactory(bulkMaxSize), + "application/vnd.formance.ledger.api.v2.bulk+script-stream": bulking.NewTextStreamBulkHandlerFactory(), + "application/vnd.formance.ledger.api.v2.bulk+json-stream": bulking.NewJSONStreamBulkHandlerFactory(), + }) +} + var defaultRouterOptions = []RouterOption{ WithTracer(nooptracer.Tracer{}), WithBulkerFactory(bulking.NewDefaultBulkerFactory()), - WithBulkHandlerFactories(map[string]bulking.HandlerFactory{ - "application/json": bulking.NewJSONBulkHandlerFactory(100), - "application/vnd.formance.ledger.api.v2.bulk+script-stream": bulking.NewScriptStreamBulkHandlerFactory(), - }), + WithDefaultBulkHandlerFactories(100), WithPaginationConfig(common.PaginationConfig{ DefaultPageSize: bunpaginate.QueryDefaultPageSize, MaxPageSize: bunpaginate.MaxPageSize, diff --git a/test/e2e/api_bulk_test.go b/test/e2e/api_bulk_test.go index 9b3850e1d3..b93b2d6ebb 100644 --- a/test/e2e/api_bulk_test.go +++ b/test/e2e/api_bulk_test.go @@ -3,12 +3,15 @@ package test_suite import ( + "bytes" + "encoding/json" "github.com/formancehq/go-libs/v2/pointer" ledger "github.com/formancehq/ledger/internal" "github.com/formancehq/ledger/internal/bus" ledgerevents "github.com/formancehq/ledger/pkg/events" "github.com/nats-io/nats.go" "math/big" + "net/http" "time" "github.com/formancehq/go-libs/v2/logging" @@ -193,6 +196,67 @@ var _ = Context("Ledger engine tests", func() { }) }) }) + When("creating a bulk on a ledger using json stream", func() { + var ( + now = time.Now().Round(time.Microsecond).UTC() + items []components.V2BulkElement + err error + ) + BeforeEach(func() { + items = []components.V2BulkElement{ + components.CreateV2BulkElementCreateTransaction(components.V2BulkElementCreateTransaction{ + Data: &components.V2PostTransaction{ + Metadata: map[string]string{}, + Postings: []components.V2Posting{{ + Amount: big.NewInt(100), + Asset: "USD/2", + Destination: "bank", + Source: "world", + }}, + Timestamp: &now, + }, + }), + components.CreateV2BulkElementCreateTransaction(components.V2BulkElementCreateTransaction{ + Data: &components.V2PostTransaction{ + Metadata: map[string]string{}, + Postings: []components.V2Posting{{ + Amount: big.NewInt(100), + Asset: "USD/2", + Destination: "bank", + Source: "world", + }}, + Timestamp: &now, + }, + }), + } + }) + JustBeforeEach(func() { + stream := bytes.NewBuffer(nil) + for _, item := range items { + data, err := json.Marshal(item) + Expect(err).To(Succeed()) + stream.Write(data) + } + stream.Write([]byte("\n")) + + req, err := http.NewRequest(http.MethodPost, testServer.GetValue().URL()+"/v2/default/_bulk", stream) + req.Header.Set("Content-Type", "application/vnd.formance.ledger.api.v2.bulk+json-stream") + Expect(err).To(Succeed()) + + rsp, err := http.DefaultClient.Do(req) + Expect(err).To(Succeed()) + Expect(rsp.StatusCode).To(Equal(http.StatusOK)) + }) + It("should be ok", func() { + Expect(err).To(Succeed()) + + txs, err := ListTransactions(ctx, testServer.GetValue(), operations.V2ListTransactionsRequest{ + Ledger: "default", + }) + Expect(err).To(Succeed()) + Expect(txs.Data).To(HaveLen(2)) + }) + }) When("creating a bulk with an error on a ledger", func() { var ( now = time.Now().Round(time.Microsecond).UTC()