diff --git a/Makefile b/Makefile index b915a78..dfad62a 100644 --- a/Makefile +++ b/Makefile @@ -45,6 +45,7 @@ migrate-up: migrate-down: migrate -path pkg/db/migration -database "$(POSTGRES_URL)" -verbose down + new-migration: migrate create -ext sql -dir pkg/db/migration -seq $(name) diff --git a/app.compose.yml b/app.compose.yml index 3e3360f..2e1f913 100644 --- a/app.compose.yml +++ b/app.compose.yml @@ -24,7 +24,7 @@ services: volumes: - dexon_openobserve_data:/data api: - image: dexon-api + image: dexon-api container_name: dexon-api build: context: . diff --git a/cmd/api/server/routes.go b/cmd/api/server/routes.go index ce9c643..9b5b0f4 100644 --- a/cmd/api/server/routes.go +++ b/cmd/api/server/routes.go @@ -4,6 +4,7 @@ import ( "github.com/labstack/echo/v4" "github.com/zuni-lab/dexon-service/internal/chat" "github.com/zuni-lab/dexon-service/internal/health" + "github.com/zuni-lab/dexon-service/internal/orders" "github.com/zuni-lab/dexon-service/internal/pools" ) @@ -12,4 +13,5 @@ func setupRoute(e *echo.Echo) { health.Route(e, "/health") pools.Route(api, "/pools") chat.Route(api, "/chat") + orders.Route(api, "/orders") } diff --git a/cmd/worker/job/price_tracker.go b/cmd/worker/job/price_tracker.go new file mode 100644 index 0000000..2443b10 --- /dev/null +++ b/cmd/worker/job/price_tracker.go @@ -0,0 +1,210 @@ +package job + +import ( + "context" + "fmt" + "github.com/zuni-lab/dexon-service/internal/orders/services" + "github.com/zuni-lab/dexon-service/pkg/utils" + "strings" + "sync" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/rs/zerolog/log" + "github.com/zuni-lab/dexon-service/config" + "github.com/zuni-lab/dexon-service/pkg/evm" +) + +type PriceTracker struct { + client *ethclient.Client + backoff *backoff.ExponentialBackOff + maxAttempts uint64 +} + +func NewPriceTracker() *PriceTracker { + b := backoff.NewExponentialBackOff() + b.InitialInterval = 1 * time.Second + b.MaxInterval = 1 * time.Minute + b.MaxElapsedTime = 30 * time.Minute + + return &PriceTracker{ + backoff: b, + maxAttempts: 100, + } +} + +func (p *PriceTracker) Start(ctx context.Context) error { + for { + if err := p.connect(); err != nil { + log.Error().Err(err).Msg("Failed to connect to Ethereum client") + continue + } + + if err := p.WatchPools(ctx); err != nil { + log.Error().Err(err).Msg("Error watching pools") + if p.client != nil { + p.client.Close() + p.client = nil + } + continue + } + + return nil + } +} + +func (p *PriceTracker) connect() error { + url := strings.Replace(config.Env.AlchemyUrl, "https", "wss", 1) + + var client *ethclient.Client + var err error + + operation := func() error { + log.Info().Msg("Attempting to connect to Ethereum client...") + client, err = ethclient.Dial(url) + if err != nil { + return fmt.Errorf("failed to connect: %w", err) + } + return nil + } + + if err := backoff.Retry(operation, p.backoff); err != nil { + return err + } + + p.client = client + return nil +} + +func (p *PriceTracker) WatchPools(ctx context.Context) error { + pools := []common.Address{ + common.HexToAddress("0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640"), + common.HexToAddress("0xc7bbec68d12a0d1830360f8ec58fa599ba1b0e9b"), + } + + errChan := make(chan error, len(pools)) + var wg sync.WaitGroup + + for _, pool := range pools { + wg.Add(1) + go func(pool common.Address) { + defer wg.Done() + if err := p.WatchPool(ctx, pool); err != nil { + select { + case errChan <- fmt.Errorf("pool %s: %w", pool.Hex(), err): + default: + } + } + }(pool) + } + + go func() { + wg.Wait() + close(errChan) + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-errChan: + return err + } +} + +func (p *PriceTracker) WatchPool(ctx context.Context, pool common.Address) error { + for { + if err := p.watchPoolWithRetry(ctx, pool); err != nil { + if ctx.Err() != nil { + return ctx.Err() + } + log.Error(). + Err(err). + Str("pool", pool.Hex()). + Msg("Error watching pool, will retry") + continue + } + return nil + } +} + +func (p *PriceTracker) watchPoolWithRetry(ctx context.Context, pool common.Address) error { + log.Info().Msgf("Watching pool %s", pool.Hex()) + attempt := 0 +RETRY: + for attempt < int(p.maxAttempts) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + contract, err := evm.NewUniswapV3(pool, p.client) + if err != nil { + return fmt.Errorf("failed to create contract instance: %w", err) + } + + watchOpts := &bind.WatchOpts{Context: ctx} + sink := make(chan *evm.UniswapV3Swap) + + sub, err := contract.WatchSwap(watchOpts, sink, nil, nil) + if err != nil { + attempt++ + nextBackoff := p.backoff.NextBackOff() + if nextBackoff == backoff.Stop { + return fmt.Errorf("max elapsed time reached after %d attempts", attempt) + } + log.Warn(). + Err(err). + Int("attempt", attempt). + Dur("backoff", nextBackoff). + Str("pool", pool.Hex()). + Msg("Failed to watch swaps, retrying...") + time.Sleep(nextBackoff) + continue + } + defer sub.Unsubscribe() + + attempt = 0 + p.backoff.Reset() + + log.Info().Msg("🚀 Ready to watch pool") + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-sub.Err(): + if err != nil { + log.Warn(). + Err(err). + Str("pool", pool.Hex()). + Msg("Subscription error, reconnecting...") + goto RETRY + } + case event := <-sink: + if err := p.handleEvent(event); err != nil { + log.Error(). + Err(err). + Str("pool", pool.Hex()). + Msg("Error handling event") + } + } + } + } + + return fmt.Errorf("failed to maintain subscription after %d attempts", p.maxAttempts) +} + +func (p *PriceTracker) handleEvent(event *evm.UniswapV3Swap) error { + price := utils.CalculatePrice(nil, 0, 0, false) + _, err := services.MatchOrder(context.Background(), price.String()) + if err != nil { + log.Info().Any("event", event).Err(err).Msgf("[PriceTracker] [HandleEvent] failed to match order") + } + + log.Info().Any("event", event).Msgf("[PriceTracker] [HandleEvent] handled %s event", event.Raw.Address.Hex()) + return nil +} diff --git a/go.mod b/go.mod index 0fe1305..36a05c6 100644 --- a/go.mod +++ b/go.mod @@ -9,8 +9,10 @@ require ( github.com/ethereum/go-ethereum v1.15.2 github.com/go-playground/validator/v10 v10.24.0 github.com/golang-migrate/migrate/v4 v4.18.2 + github.com/google/btree v1.1.3 github.com/jackc/pgx/v5 v5.7.2 github.com/jedib0t/go-pretty/v6 v6.6.6 + github.com/jinzhu/copier v0.4.0 github.com/joho/godotenv v1.5.1 github.com/labstack/echo/v4 v4.13.3 github.com/openai/openai-go v0.1.0-alpha.59 diff --git a/go.sum b/go.sum index 175c794..15fc337 100644 --- a/go.sum +++ b/go.sum @@ -103,6 +103,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb h1:PBC98N2aIaM3XXiurYmW7fx4GZkL8feAMVq7nEjURHk= github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= +github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= @@ -141,6 +143,8 @@ github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7Bd github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jedib0t/go-pretty/v6 v6.6.6 h1:LyezkL+1SuqH2z47e5IMQkYUIcs2BD+MnpdPRiRcN0c= github.com/jedib0t/go-pretty/v6 v6.6.6/go.mod h1:YwC5CE4fJ1HFUDeivSV1r//AmANFHyqczZk+U6BDALU= +github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8= +github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4= diff --git a/internal/orders/handlers/create.go b/internal/orders/handlers/create.go new file mode 100644 index 0000000..325f7ab --- /dev/null +++ b/internal/orders/handlers/create.go @@ -0,0 +1,25 @@ +package handlers + +import ( + "github.com/zuni-lab/dexon-service/internal/orders/services" + "net/http" + + "github.com/labstack/echo/v4" + "github.com/zuni-lab/dexon-service/pkg/utils" +) + +func Create(c echo.Context) error { + ctx := c.Request().Context() + + var body services.CreateOrderBody + if err := utils.BindAndValidate(c, &body); err != nil { + return err + } + + order, err := services.CreateOrder(ctx, body) + if err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err) + } + + return c.JSON(http.StatusOK, order) +} diff --git a/internal/orders/handlers/list.go.go b/internal/orders/handlers/list.go.go new file mode 100644 index 0000000..1627451 --- /dev/null +++ b/internal/orders/handlers/list.go.go @@ -0,0 +1,25 @@ +package handlers + +import ( + "github.com/zuni-lab/dexon-service/internal/orders/services" + "net/http" + + "github.com/labstack/echo/v4" + "github.com/zuni-lab/dexon-service/pkg/utils" +) + +func List(c echo.Context) error { + ctx := c.Request().Context() + + var query services.ListOrdersByWalletQuery + if err := utils.BindAndValidate(c, &query); err != nil { + return err + } + + orders, err := services.ListOrderByWallet(ctx, query) + if err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err) + } + + return c.JSON(http.StatusOK, orders) +} diff --git a/internal/orders/route.go b/internal/orders/route.go new file mode 100644 index 0000000..4738ddb --- /dev/null +++ b/internal/orders/route.go @@ -0,0 +1,22 @@ +package orders + +import ( + "github.com/labstack/echo/v4" + "github.com/zuni-lab/dexon-service/internal/orders/handlers" +) + +func Route(g *echo.Group, path string) { + // TODO: add middleware here + + middleware := echo.MiddlewareFunc(func(next echo.HandlerFunc) echo.HandlerFunc { + return func(c echo.Context) error { + // TODO: add middleware here + return next(c) + } + }) + + ordersGroup := g.Group(path, middleware) + + ordersGroup.GET("", handlers.List) + ordersGroup.POST("", handlers.Create) +} diff --git a/internal/orders/services/order.go b/internal/orders/services/order.go new file mode 100644 index 0000000..3f45028 --- /dev/null +++ b/internal/orders/services/order.go @@ -0,0 +1,155 @@ +package services + +import ( + "context" + "errors" + "github.com/jackc/pgx/v5/pgtype" + "github.com/jinzhu/copier" + "github.com/zuni-lab/dexon-service/pkg/db" + "slices" + "time" +) + +type ListOrdersByWalletQuery struct { + Wallet string `query:"wallet" validate:"eth_addr"` + Limit int32 `query:"limit" validate:"gt=0"` + Offset int32 `query:"offset" validate:"gte=0"` +} + +type ListOrdersByWalletResponseItem struct { + db.Order + Children []db.Order `json:"children"` +} + +func ListOrderByWallet(ctx context.Context, query ListOrdersByWalletQuery) ([]ListOrdersByWalletResponseItem, error) { + var params db.GetOrdersByWalletParams + if err := copier.Copy(¶ms, &query); err != nil { + return nil, err + } + + orders, err := db.DB.GetOrdersByWallet(ctx, params) + if err != nil { + return nil, err + } + + var ( + item ListOrdersByWalletResponseItem + res = []ListOrdersByWalletResponseItem{} + ) + for _, order := range orders { + if idx := slices.IndexFunc(res, func(item ListOrdersByWalletResponseItem) bool { + return item.ID == order.ID + }); idx != -1 { + res[idx].Children = append(res[idx].Children, order.Order) + } + + err = copier.Copy(&item, &order) + if err != nil { + return nil, err + } + res = append(res, item) + } + + return res, nil +} + +type CreateOrderBody struct { + Wallet string `json:"wallet" validate:"eth_addr"` + Token0 string `json:"token0" validate:"eth_addr"` + Token1 string `json:"token1" validate:"eth_addr"` + Side db.OrderSide `json:"side" validate:"oneof=BUY SELL"` + Type db.OrderType `json:"type" validate:"oneof=MARKET LIMIT STOP TWAP"` + Price string `json:"price" validate:"numeric,gt=0"` + Amount string `json:"amount" validate:"numeric,gt=0"` + TwapTotalTime *int32 `json:"twap_total_time" validate:"omitempty,gt=0"` +} + +func CreateOrder(ctx context.Context, body CreateOrderBody) (*db.Order, error) { + var ( + pool db.Pool + params db.InsertOrderParams + ) + + if err := copier.Copy(¶ms, &body); err != nil { + return nil, err + } + + pool, err := db.DB.GetPoolByToken(ctx, db.GetPoolByTokenParams{ + Token0ID: body.Token0, + Token1ID: body.Token1, + }) + if err != nil { + return nil, err + } + + params.PoolID = pool.ID + if params.Type == db.OrderTypeMARKET { + params.Status = db.OrderStatusFILLED + _ = params.FilledAt.Scan(time.Now()) + } else { + params.Status = db.OrderStatusPENDING + } + + order, err := db.DB.InsertOrder(ctx, params) + if err != nil { + return nil, err + } + + return &order, nil +} + +func FillPartialOrder(ctx context.Context, parent db.Order, price, amount string) (*db.Order, error) { + var params db.InsertOrderParams + if err := copier.Copy(¶ms, &parent); err != nil { + return nil, err + } + + _ = params.ParentID.Scan(parent.ID) + _ = params.FilledAt.Scan(time.Now()) + _ = params.Price.Scan(price) + _ = params.Amount.Scan(amount) + params.Status = db.OrderStatusFILLED + + order, err := db.DB.InsertOrder(ctx, params) + if err != nil { + return nil, err + } + + return &order, nil +} + +func MatchOrder(ctx context.Context, price string) (*db.Order, error) { + var numericPrice pgtype.Numeric + err := numericPrice.Scan(price) + if err != nil { + return nil, err + } + + order, err := db.DB.GetMatchedOrder(ctx, numericPrice) + if err != nil { + return nil, err + } + + // TODO: Call to contract + + params := db.UpdateOrderParams{ + ID: order.ID, + } + if order.Type == db.OrderTypeLIMIT || + order.Type == db.OrderTypeSTOP || + order.Type == db.OrderTypeTWAP && order.TwapAmount.Int.Cmp(order.Amount.Int) == 0 { + _ = params.FilledAt.Scan(time.Now()) + params.Status = db.OrderStatusFILLED + } else if order.Type == db.OrderTypeTWAP { + params.Status = db.OrderStatusPARTIALFILLED + } else { + return nil, errors.New("invalid order") + } + + order, err = db.DB.UpdateOrder(ctx, params) + if err != nil { + return nil, err + } + + return &order, nil +} diff --git a/pkg/db/migration/000002_create_orders_table.down.sql b/pkg/db/migration/000002_create_orders_table.down.sql new file mode 100644 index 0000000..30d483d --- /dev/null +++ b/pkg/db/migration/000002_create_orders_table.down.sql @@ -0,0 +1,5 @@ +DROP TABLE IF EXISTS orders; + +DROP TYPE IF EXISTS ORDER_STATUS; +DROP TYPE IF EXISTS ORDER_SIDE; +DROP TYPE IF EXISTS ORDER_TYPE; diff --git a/pkg/db/migration/000002_create_orders_table.up.sql b/pkg/db/migration/000002_create_orders_table.up.sql new file mode 100644 index 0000000..320e0be --- /dev/null +++ b/pkg/db/migration/000002_create_orders_table.up.sql @@ -0,0 +1,24 @@ +CREATE TYPE ORDER_STATUS AS ENUM ('PENDING', 'PARTIAL_FILLED' ,'FILLED', 'REJECTED', 'CANCELLED'); +CREATE TYPE ORDER_SIDE AS ENUM ('BUY', 'SELL'); +CREATE TYPE ORDER_TYPE AS ENUM ('MARKET', 'LIMIT', 'STOP', 'TWAP'); + +CREATE TABLE IF NOT EXISTS orders ( + id BIGSERIAL PRIMARY KEY, + pool_id VARCHAR(42) NOT NULL, + parent_id BIGINT, + wallet VARCHAR(42), + status ORDER_STATUS NOT NULL DEFAULT 'PENDING', + side ORDER_SIDE NOT NULL, + type ORDER_TYPE NOT NULL, + price NUMERIC(78,18) NOT NULL, + amount NUMERIC(78,18) NOT NULL, + twap_amount NUMERIC(78,18), + twap_parts INT, + partial_filled_at TIMESTAMP WITH TIME ZONE, + filled_at TIMESTAMP WITH TIME ZONE, + cancelled_at TIMESTAMP WITH TIME ZONE, + created_at TIMESTAMP WITH TIME ZONE, + + FOREIGN KEY (pool_id) REFERENCES pools(id) ON DELETE CASCADE, + FOREIGN KEY (parent_id) REFERENCES orders(id) ON DELETE CASCADE +); diff --git a/pkg/db/models.go b/pkg/db/models.go index b378255..97c2416 100644 --- a/pkg/db/models.go +++ b/pkg/db/models.go @@ -5,9 +5,161 @@ package db import ( + "database/sql/driver" + "fmt" + "github.com/jackc/pgx/v5/pgtype" ) +type OrderSide string + +const ( + OrderSideBUY OrderSide = "BUY" + OrderSideSELL OrderSide = "SELL" +) + +func (e *OrderSide) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = OrderSide(s) + case string: + *e = OrderSide(s) + default: + return fmt.Errorf("unsupported scan type for OrderSide: %T", src) + } + return nil +} + +type NullOrderSide struct { + OrderSide OrderSide `json:"order_side"` + Valid bool `json:"valid"` // Valid is true if OrderSide is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullOrderSide) Scan(value interface{}) error { + if value == nil { + ns.OrderSide, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.OrderSide.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullOrderSide) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.OrderSide), nil +} + +type OrderStatus string + +const ( + OrderStatusPENDING OrderStatus = "PENDING" + OrderStatusPARTIALFILLED OrderStatus = "PARTIAL_FILLED" + OrderStatusFILLED OrderStatus = "FILLED" + OrderStatusREJECTED OrderStatus = "REJECTED" + OrderStatusCANCELLED OrderStatus = "CANCELLED" +) + +func (e *OrderStatus) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = OrderStatus(s) + case string: + *e = OrderStatus(s) + default: + return fmt.Errorf("unsupported scan type for OrderStatus: %T", src) + } + return nil +} + +type NullOrderStatus struct { + OrderStatus OrderStatus `json:"order_status"` + Valid bool `json:"valid"` // Valid is true if OrderStatus is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullOrderStatus) Scan(value interface{}) error { + if value == nil { + ns.OrderStatus, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.OrderStatus.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullOrderStatus) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.OrderStatus), nil +} + +type OrderType string + +const ( + OrderTypeMARKET OrderType = "MARKET" + OrderTypeLIMIT OrderType = "LIMIT" + OrderTypeSTOP OrderType = "STOP" + OrderTypeTWAP OrderType = "TWAP" +) + +func (e *OrderType) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = OrderType(s) + case string: + *e = OrderType(s) + default: + return fmt.Errorf("unsupported scan type for OrderType: %T", src) + } + return nil +} + +type NullOrderType struct { + OrderType OrderType `json:"order_type"` + Valid bool `json:"valid"` // Valid is true if OrderType is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullOrderType) Scan(value interface{}) error { + if value == nil { + ns.OrderType, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.OrderType.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullOrderType) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.OrderType), nil +} + +type Order struct { + ID int64 `json:"id"` + PoolID string `json:"pool_id"` + ParentID pgtype.Int8 `json:"parent_id"` + Wallet pgtype.Text `json:"wallet"` + Status OrderStatus `json:"status"` + Side OrderSide `json:"side"` + Type OrderType `json:"type"` + Price pgtype.Numeric `json:"price"` + Amount pgtype.Numeric `json:"amount"` + TwapAmount pgtype.Numeric `json:"twap_amount"` + TwapParts pgtype.Int4 `json:"twap_parts"` + PartialFilledAt pgtype.Timestamptz `json:"partial_filled_at"` + FilledAt pgtype.Timestamptz `json:"filled_at"` + CancelledAt pgtype.Timestamptz `json:"cancelled_at"` + CreatedAt pgtype.Timestamptz `json:"created_at"` +} + type Pool struct { ID string `json:"id"` Token0ID string `json:"token0_id"` diff --git a/pkg/db/orders.sql.go b/pkg/db/orders.sql.go new file mode 100644 index 0000000..c14e298 --- /dev/null +++ b/pkg/db/orders.sql.go @@ -0,0 +1,289 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.28.0 +// source: orders.sql + +package db + +import ( + "context" + + "github.com/jackc/pgx/v5/pgtype" +) + +const getMatchedOrder = `-- name: GetMatchedOrder :one +SELECT id, pool_id, parent_id, wallet, status, side, type, price, amount, twap_amount, twap_parts, partial_filled_at, filled_at, cancelled_at, created_at FROM orders +WHERE ( + (side = 'BUY' AND type = 'LIMIT' AND price <= $1) + OR (side = 'SELL' AND type = 'LIMIT' AND price >= $1) + OR (side = 'BUY' AND type = 'STOP' AND price >= $1) + OR (side = 'SELL' AND type 'STOP' AND price <= $1) + OR (side = 'BUY' AND type = 'TWAP' AND price <= $1) + OR (side = 'SELL' AND type = 'TWAP' AND price >= $1) + ) + AND status IN ('PENDING', 'PARTIAL_FILLED') + AND type <> 'TWAP' +ORDER BY created_at ASC +LIMIT 1 +` + +func (q *Queries) GetMatchedOrder(ctx context.Context, price pgtype.Numeric) (Order, error) { + row := q.db.QueryRow(ctx, getMatchedOrder, price) + var i Order + err := row.Scan( + &i.ID, + &i.PoolID, + &i.ParentID, + &i.Wallet, + &i.Status, + &i.Side, + &i.Type, + &i.Price, + &i.Amount, + &i.TwapAmount, + &i.TwapParts, + &i.PartialFilledAt, + &i.FilledAt, + &i.CancelledAt, + &i.CreatedAt, + ) + return i, err +} + +const getOrdersByStatus = `-- name: GetOrdersByStatus :many +SELECT id, pool_id, parent_id, wallet, status, side, type, price, amount, twap_amount, twap_parts, partial_filled_at, filled_at, cancelled_at, created_at FROM orders +WHERE status = ANY($1::varchar[]) +` + +func (q *Queries) GetOrdersByStatus(ctx context.Context, status []string) ([]Order, error) { + rows, err := q.db.Query(ctx, getOrdersByStatus, status) + if err != nil { + return nil, err + } + defer rows.Close() + items := []Order{} + for rows.Next() { + var i Order + if err := rows.Scan( + &i.ID, + &i.PoolID, + &i.ParentID, + &i.Wallet, + &i.Status, + &i.Side, + &i.Type, + &i.Price, + &i.Amount, + &i.TwapAmount, + &i.TwapParts, + &i.PartialFilledAt, + &i.FilledAt, + &i.CancelledAt, + &i.CreatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const getOrdersByWallet = `-- name: GetOrdersByWallet :many +SELECT o1.id, o1.pool_id, o1.parent_id, o1.wallet, o1.status, o1.side, o1.type, o1.price, o1.amount, o1.twap_amount, o1.twap_parts, o1.partial_filled_at, o1.filled_at, o1.cancelled_at, o1.created_at, o2.id, o2.pool_id, o2.parent_id, o2.wallet, o2.status, o2.side, o2.type, o2.price, o2.amount, o2.twap_amount, o2.twap_parts, o2.partial_filled_at, o2.filled_at, o2.cancelled_at, o2.created_at FROM orders AS o1 +LEFT JOIN orders AS o2 ON o1.id = o2.parent_id AND o2.parent_id IS NOT NULL +WHERE o1.wallet = $1 +ORDER BY o1.created_at DESC +LIMIT $2 OFFSET $3 +` + +type GetOrdersByWalletParams struct { + Wallet pgtype.Text `json:"wallet"` + Limit int32 `json:"limit"` + Offset int32 `json:"offset"` +} + +type GetOrdersByWalletRow struct { + ID int64 `json:"id"` + PoolID string `json:"pool_id"` + ParentID pgtype.Int8 `json:"parent_id"` + Wallet pgtype.Text `json:"wallet"` + Status OrderStatus `json:"status"` + Side OrderSide `json:"side"` + Type OrderType `json:"type"` + Price pgtype.Numeric `json:"price"` + Amount pgtype.Numeric `json:"amount"` + TwapAmount pgtype.Numeric `json:"twap_amount"` + TwapParts pgtype.Int4 `json:"twap_parts"` + PartialFilledAt pgtype.Timestamptz `json:"partial_filled_at"` + FilledAt pgtype.Timestamptz `json:"filled_at"` + CancelledAt pgtype.Timestamptz `json:"cancelled_at"` + CreatedAt pgtype.Timestamptz `json:"created_at"` + Order Order `json:"order"` +} + +func (q *Queries) GetOrdersByWallet(ctx context.Context, arg GetOrdersByWalletParams) ([]GetOrdersByWalletRow, error) { + rows, err := q.db.Query(ctx, getOrdersByWallet, arg.Wallet, arg.Limit, arg.Offset) + if err != nil { + return nil, err + } + defer rows.Close() + items := []GetOrdersByWalletRow{} + for rows.Next() { + var i GetOrdersByWalletRow + if err := rows.Scan( + &i.ID, + &i.PoolID, + &i.ParentID, + &i.Wallet, + &i.Status, + &i.Side, + &i.Type, + &i.Price, + &i.Amount, + &i.TwapAmount, + &i.TwapParts, + &i.PartialFilledAt, + &i.FilledAt, + &i.CancelledAt, + &i.CreatedAt, + &i.Order.ID, + &i.Order.PoolID, + &i.Order.ParentID, + &i.Order.Wallet, + &i.Order.Status, + &i.Order.Side, + &i.Order.Type, + &i.Order.Price, + &i.Order.Amount, + &i.Order.TwapAmount, + &i.Order.TwapParts, + &i.Order.PartialFilledAt, + &i.Order.FilledAt, + &i.Order.CancelledAt, + &i.Order.CreatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const insertOrder = `-- name: InsertOrder :one +INSERT INTO orders (parent_id, wallet, pool_id, side, status, type, price, amount, twap_amount, twap_parts, filled_at, partial_filled_at, cancelled_at) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) +RETURNING id, pool_id, parent_id, wallet, status, side, type, price, amount, twap_amount, twap_parts, partial_filled_at, filled_at, cancelled_at, created_at +` + +type InsertOrderParams struct { + ParentID pgtype.Int8 `json:"parent_id"` + Wallet pgtype.Text `json:"wallet"` + PoolID string `json:"pool_id"` + Side OrderSide `json:"side"` + Status OrderStatus `json:"status"` + Type OrderType `json:"type"` + Price pgtype.Numeric `json:"price"` + Amount pgtype.Numeric `json:"amount"` + TwapAmount pgtype.Numeric `json:"twap_amount"` + TwapParts pgtype.Int4 `json:"twap_parts"` + FilledAt pgtype.Timestamptz `json:"filled_at"` + PartialFilledAt pgtype.Timestamptz `json:"partial_filled_at"` + CancelledAt pgtype.Timestamptz `json:"cancelled_at"` +} + +func (q *Queries) InsertOrder(ctx context.Context, arg InsertOrderParams) (Order, error) { + row := q.db.QueryRow(ctx, insertOrder, + arg.ParentID, + arg.Wallet, + arg.PoolID, + arg.Side, + arg.Status, + arg.Type, + arg.Price, + arg.Amount, + arg.TwapAmount, + arg.TwapParts, + arg.FilledAt, + arg.PartialFilledAt, + arg.CancelledAt, + ) + var i Order + err := row.Scan( + &i.ID, + &i.PoolID, + &i.ParentID, + &i.Wallet, + &i.Status, + &i.Side, + &i.Type, + &i.Price, + &i.Amount, + &i.TwapAmount, + &i.TwapParts, + &i.PartialFilledAt, + &i.FilledAt, + &i.CancelledAt, + &i.CreatedAt, + ) + return i, err +} + +const updateOrder = `-- name: UpdateOrder :one +UPDATE orders +SET + status = COALESCE($2, status), + twap_amount = COALESCE($3, twap_amount), + filled_at = COALESCE($4, filled_at), + cancelled_at = COALESCE($5, cancelled_at), + twap_amount = COALESCE($6, twap_amount), + partial_filled_at = COALESCE($7, partial_filled_at) +WHERE id = $1 +RETURNING id, pool_id, parent_id, wallet, status, side, type, price, amount, twap_amount, twap_parts, partial_filled_at, filled_at, cancelled_at, created_at +` + +type UpdateOrderParams struct { + ID int64 `json:"id"` + Status OrderStatus `json:"status"` + TwapAmount pgtype.Numeric `json:"twap_amount"` + FilledAt pgtype.Timestamptz `json:"filled_at"` + CancelledAt pgtype.Timestamptz `json:"cancelled_at"` + TwapAmount_2 pgtype.Numeric `json:"twap_amount_2"` + PartialFilledAt pgtype.Timestamptz `json:"partial_filled_at"` +} + +func (q *Queries) UpdateOrder(ctx context.Context, arg UpdateOrderParams) (Order, error) { + row := q.db.QueryRow(ctx, updateOrder, + arg.ID, + arg.Status, + arg.TwapAmount, + arg.FilledAt, + arg.CancelledAt, + arg.TwapAmount_2, + arg.PartialFilledAt, + ) + var i Order + err := row.Scan( + &i.ID, + &i.PoolID, + &i.ParentID, + &i.Wallet, + &i.Status, + &i.Side, + &i.Type, + &i.Price, + &i.Amount, + &i.TwapAmount, + &i.TwapParts, + &i.PartialFilledAt, + &i.FilledAt, + &i.CancelledAt, + &i.CreatedAt, + ) + return i, err +} diff --git a/pkg/db/pools.sql.go b/pkg/db/pools.sql.go index 8fc66c4..493df86 100644 --- a/pkg/db/pools.sql.go +++ b/pkg/db/pools.sql.go @@ -50,6 +50,28 @@ func (q *Queries) GetPool(ctx context.Context, id string) (Pool, error) { return i, err } +const getPoolByToken = `-- name: GetPoolByToken :one +SELECT id, token0_id, token1_id, created_at FROM pools +WHERE token0_id = $1 AND token1_id = $2 LIMIT 1 +` + +type GetPoolByTokenParams struct { + Token0ID string `json:"token0_id"` + Token1ID string `json:"token1_id"` +} + +func (q *Queries) GetPoolByToken(ctx context.Context, arg GetPoolByTokenParams) (Pool, error) { + row := q.db.QueryRow(ctx, getPoolByToken, arg.Token0ID, arg.Token1ID) + var i Pool + err := row.Scan( + &i.ID, + &i.Token0ID, + &i.Token1ID, + &i.CreatedAt, + ) + return i, err +} + const getPools = `-- name: GetPools :many SELECT id, token0_id, token1_id, created_at FROM pools ` diff --git a/pkg/db/querier.go b/pkg/db/querier.go index d62334b..d680cf0 100644 --- a/pkg/db/querier.go +++ b/pkg/db/querier.go @@ -6,6 +6,8 @@ package db import ( "context" + + "github.com/jackc/pgx/v5/pgtype" ) type Querier interface { @@ -13,11 +15,17 @@ type Querier interface { CreatePrice(ctx context.Context, arg CreatePriceParams) (Price, error) CreateToken(ctx context.Context, arg CreateTokenParams) (Token, error) GetMarketData(ctx context.Context, arg GetMarketDataParams) ([]GetMarketDataRow, error) + GetMatchedOrder(ctx context.Context, price pgtype.Numeric) (Order, error) + GetOrdersByStatus(ctx context.Context, status []string) ([]Order, error) + GetOrdersByWallet(ctx context.Context, arg GetOrdersByWalletParams) ([]GetOrdersByWalletRow, error) GetPool(ctx context.Context, id string) (Pool, error) + GetPoolByToken(ctx context.Context, arg GetPoolByTokenParams) (Pool, error) GetPools(ctx context.Context) ([]Pool, error) GetPriceByPoolID(ctx context.Context, poolID string) (Price, error) GetPrices(ctx context.Context, arg GetPricesParams) ([]Price, error) + InsertOrder(ctx context.Context, arg InsertOrderParams) (Order, error) PoolDetails(ctx context.Context, id string) (PoolDetailsRow, error) + UpdateOrder(ctx context.Context, arg UpdateOrderParams) (Order, error) } var _ Querier = (*Queries)(nil) diff --git a/pkg/db/query/orders.sql b/pkg/db/query/orders.sql new file mode 100644 index 0000000..b010a74 --- /dev/null +++ b/pkg/db/query/orders.sql @@ -0,0 +1,42 @@ +-- name: InsertOrder :one +INSERT INTO orders (parent_id, wallet, pool_id, side, status, type, price, amount, twap_amount, twap_parts, filled_at, partial_filled_at, cancelled_at) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) +RETURNING *; + +-- name: GetOrdersByWallet :many +SELECT o1.*, sqlc.embed(o2) FROM orders AS o1 +LEFT JOIN orders AS o2 ON o1.id = o2.parent_id AND o2.parent_id IS NOT NULL +WHERE o1.wallet = $1 +ORDER BY o1.created_at DESC +LIMIT $2 OFFSET $3; + +-- name: GetOrdersByStatus :many +SELECT * FROM orders +WHERE status = ANY(@status::varchar[]); + +-- name: GetMatchedOrder :one +SELECT * FROM orders +WHERE ( + (side = 'BUY' AND type = 'LIMIT' AND price <= $1) + OR (side = 'SELL' AND type = 'LIMIT' AND price >= $1) + OR (side = 'BUY' AND type = 'STOP' AND price >= $1) + OR (side = 'SELL' AND type 'STOP' AND price <= $1) + OR (side = 'BUY' AND type = 'TWAP' AND price <= $1) + OR (side = 'SELL' AND type = 'TWAP' AND price >= $1) + ) + AND status IN ('PENDING', 'PARTIAL_FILLED') + AND type <> 'TWAP' +ORDER BY created_at ASC +LIMIT 1; + +-- name: UpdateOrder :one +UPDATE orders +SET + status = COALESCE($2, status), + twap_amount = COALESCE($3, twap_amount), + filled_at = COALESCE($4, filled_at), + cancelled_at = COALESCE($5, cancelled_at), + twap_amount = COALESCE($6, twap_amount), + partial_filled_at = COALESCE($7, partial_filled_at) +WHERE id = $1 +RETURNING *; diff --git a/pkg/db/query/pools.sql b/pkg/db/query/pools.sql index 6596de0..fa0bc6b 100644 --- a/pkg/db/query/pools.sql +++ b/pkg/db/query/pools.sql @@ -5,6 +5,10 @@ SELECT * FROM pools; SELECT * FROM pools WHERE id = $1 LIMIT 1; +-- name: GetPoolByToken :one +SELECT * FROM pools +WHERE token0_id = $1 AND token1_id = $2 LIMIT 1; + -- name: CreatePool :one INSERT INTO pools (id, token0_id, token1_id) VALUES ($1, $2, $3) diff --git a/pkg/db/query/prices.sql b/pkg/db/query/prices.sql index b390386..65feeda 100644 --- a/pkg/db/query/prices.sql +++ b/pkg/db/query/prices.sql @@ -8,7 +8,6 @@ SELECT * FROM prices ORDER BY created_at DESC LIMIT $1 OFFSET $2; - -- name: GetPriceByPoolID :one SELECT * FROM prices WHERE pool_id = $1