From 93330ec6443e7b29eebdc1d5412be614c477c7ef Mon Sep 17 00:00:00 2001 From: TP-O Date: Sun, 23 Feb 2025 11:17:02 +0700 Subject: [PATCH 01/10] faet: place stop and limit order --- Makefile | 1 + cmd/worker/job/price_tracker.go | 0 go.mod | 2 + go.sum | 4 + internal/orders/handlers/create.go | 25 +++ internal/orders/handlers/list.go.go | 25 +++ internal/orders/route.go | 22 +++ internal/orders/services/order.go | 52 +++++++ pkg/custom/order.go | 144 ++++++++++++++++++ .../000002_create_orders_table.down.sql | 5 + .../000002_create_orders_table.up.sql | 20 +++ pkg/db/models.go | 144 ++++++++++++++++++ pkg/db/orders.sql.go | 100 ++++++++++++ pkg/db/querier.go | 2 + pkg/db/query/orders.sql | 10 ++ pkg/db/query/prices.sql | 1 - 16 files changed, 556 insertions(+), 1 deletion(-) create mode 100644 cmd/worker/job/price_tracker.go create mode 100644 internal/orders/handlers/create.go create mode 100644 internal/orders/handlers/list.go.go create mode 100644 internal/orders/route.go create mode 100644 internal/orders/services/order.go create mode 100644 pkg/custom/order.go create mode 100644 pkg/db/migration/000002_create_orders_table.down.sql create mode 100644 pkg/db/migration/000002_create_orders_table.up.sql create mode 100644 pkg/db/orders.sql.go create mode 100644 pkg/db/query/orders.sql diff --git a/Makefile b/Makefile index b915a78..dfad62a 100644 --- a/Makefile +++ b/Makefile @@ -45,6 +45,7 @@ migrate-up: migrate-down: migrate -path pkg/db/migration -database "$(POSTGRES_URL)" -verbose down + new-migration: migrate create -ext sql -dir pkg/db/migration -seq $(name) diff --git a/cmd/worker/job/price_tracker.go b/cmd/worker/job/price_tracker.go new file mode 100644 index 0000000..e69de29 diff --git a/go.mod b/go.mod index 0fe1305..36a05c6 100644 --- a/go.mod +++ b/go.mod @@ -9,8 +9,10 @@ require ( github.com/ethereum/go-ethereum v1.15.2 github.com/go-playground/validator/v10 v10.24.0 github.com/golang-migrate/migrate/v4 v4.18.2 + github.com/google/btree v1.1.3 github.com/jackc/pgx/v5 v5.7.2 github.com/jedib0t/go-pretty/v6 v6.6.6 + github.com/jinzhu/copier v0.4.0 github.com/joho/godotenv v1.5.1 github.com/labstack/echo/v4 v4.13.3 github.com/openai/openai-go v0.1.0-alpha.59 diff --git a/go.sum b/go.sum index 175c794..15fc337 100644 --- a/go.sum +++ b/go.sum @@ -103,6 +103,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb h1:PBC98N2aIaM3XXiurYmW7fx4GZkL8feAMVq7nEjURHk= github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= +github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= @@ -141,6 +143,8 @@ github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7Bd github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jedib0t/go-pretty/v6 v6.6.6 h1:LyezkL+1SuqH2z47e5IMQkYUIcs2BD+MnpdPRiRcN0c= github.com/jedib0t/go-pretty/v6 v6.6.6/go.mod h1:YwC5CE4fJ1HFUDeivSV1r//AmANFHyqczZk+U6BDALU= +github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8= +github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4= diff --git a/internal/orders/handlers/create.go b/internal/orders/handlers/create.go new file mode 100644 index 0000000..325f7ab --- /dev/null +++ b/internal/orders/handlers/create.go @@ -0,0 +1,25 @@ +package handlers + +import ( + "github.com/zuni-lab/dexon-service/internal/orders/services" + "net/http" + + "github.com/labstack/echo/v4" + "github.com/zuni-lab/dexon-service/pkg/utils" +) + +func Create(c echo.Context) error { + ctx := c.Request().Context() + + var body services.CreateOrderBody + if err := utils.BindAndValidate(c, &body); err != nil { + return err + } + + order, err := services.CreateOrder(ctx, body) + if err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err) + } + + return c.JSON(http.StatusOK, order) +} diff --git a/internal/orders/handlers/list.go.go b/internal/orders/handlers/list.go.go new file mode 100644 index 0000000..cbaf160 --- /dev/null +++ b/internal/orders/handlers/list.go.go @@ -0,0 +1,25 @@ +package handlers + +import ( + "github.com/zuni-lab/dexon-service/internal/orders/services" + "net/http" + + "github.com/labstack/echo/v4" + "github.com/zuni-lab/dexon-service/pkg/utils" +) + +func List(c echo.Context) error { + ctx := c.Request().Context() + + var query services.ListOrdersByWalletQuery + if err := utils.BindAndValidate(c, &query); err != nil { + return err + } + + candlestick, err := services.ListOrderByWallet(ctx, query) + if err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err) + } + + return c.JSON(http.StatusOK, candlestick) +} diff --git a/internal/orders/route.go b/internal/orders/route.go new file mode 100644 index 0000000..5b7ca3e --- /dev/null +++ b/internal/orders/route.go @@ -0,0 +1,22 @@ +package prices + +import ( + "github.com/labstack/echo/v4" + "github.com/zuni-lab/dexon-service/internal/orders/handlers" +) + +func Route(g *echo.Group, path string) { + // TODO: add middleware here + + middleware := echo.MiddlewareFunc(func(next echo.HandlerFunc) echo.HandlerFunc { + return func(c echo.Context) error { + // TODO: add middleware here + return next(c) + } + }) + + ordersGroup := g.Group(path, middleware) + + ordersGroup.GET("/", handlers.List) + ordersGroup.POST("/", handlers.Create) +} diff --git a/internal/orders/services/order.go b/internal/orders/services/order.go new file mode 100644 index 0000000..7d63b7a --- /dev/null +++ b/internal/orders/services/order.go @@ -0,0 +1,52 @@ +package services + +import ( + "context" + "github.com/jackc/pgx/v5/pgtype" + "github.com/jinzhu/copier" + "github.com/zuni-lab/dexon-service/pkg/custom" + "github.com/zuni-lab/dexon-service/pkg/db" + "time" +) + +type ListOrdersByWalletQuery struct { + Wallet string `json:"wallet" validate:"eth_addr"` + Limit int32 `json:"limit"` + Offset int32 `json:"offset"` +} + +func ListOrderByWallet(ctx context.Context, query ListOrdersByWalletQuery) ([]db.Order, error) { + var params db.GetOrdersByWalletParams + if err := copier.Copy(¶ms, &query); err != nil { + return nil, err + } + + return db.DB.GetOrdersByWallet(ctx, params) +} + +type CreateOrderBody struct { + Wallet string `json:"wallet" validate:"eth_addr"` + FromToken string `json:"from_token" validate:"eth_addr"` + ToToken string `json:"to_token" validate:"eth_addr"` + Side db.OrderSide `json:"side" validate:"oneof=BUY SELL"` + Condition db.OrderCondition `json:"condition" validate:"oneof=LIMIT STOP"` + Price string `json:"price" validate:"numeric,gt=0"` +} + +func CreateOrder(ctx context.Context, body CreateOrderBody) (*db.Order, error) { + params := db.InsertOrderParams{ + CreatedAt: pgtype.Timestamptz{Time: time.Now(), Valid: true}, + } + if err := copier.Copy(¶ms, &body); err != nil { + return nil, err + } + + order, err := db.DB.InsertOrder(ctx, params) + if err != nil { + return nil, err + } + + orderBook := custom.GetOrderBook() + orderBook.Type(order.Side, order.Condition).ReplaceOrInsert(order) + return &order, nil +} diff --git a/pkg/custom/order.go b/pkg/custom/order.go new file mode 100644 index 0000000..5fcade8 --- /dev/null +++ b/pkg/custom/order.go @@ -0,0 +1,144 @@ +package custom + +import ( + "github.com/google/btree" + "github.com/jackc/pgx/v5/pgtype" + "github.com/zuni-lab/dexon-service/pkg/db" + "math/big" + "sync" +) + +var orderBook *OrderBook + +type OrderBook struct { + limitBuy *OrderTree + limitSell *OrderTree + stopBuy *OrderTree + stopSell *OrderTree +} + +type OrderTree struct { + btree.BTreeG[db.Order] + mux sync.RWMutex +} + +func GetOrderBook() *OrderBook { + sync.OnceFunc(func() { + orderBook = &OrderBook{ + limitBuy: newOrderTree(), + limitSell: newOrderTree(), + stopBuy: newOrderTree(), + stopSell: newOrderTree(), + } + })() + + return orderBook +} + +func compareOrder(a, b db.Order) bool { + sub := new(big.Int).Sub(a.Price.Int, b.Price.Int).Sign() + if sub < 0 { + return true + } else if sub > 0 { + return false + } else { + return a.CreatedAt.Time.After(b.CreatedAt.Time) + } +} + +func newOrderTree() *OrderTree { + return &OrderTree{ + BTreeG: *btree.NewG[db.Order](32, compareOrder), + } +} + +func (o *OrderBook) Type(side db.OrderSide, condition db.OrderCondition) *OrderTree { + if side == db.OrderSideBUY && condition == db.OrderConditionLIMIT { + return o.limitBuy + } else if side == db.OrderSideBUY && condition == db.OrderConditionSTOP { + return o.stopBuy + } else if side == db.OrderSideSELL && condition == db.OrderConditionLIMIT { + return o.limitSell + } else if side == db.OrderSideSELL && condition == db.OrderConditionSTOP { + return o.stopSell + } else { + return nil + } +} + +func (o *OrderTree) ReplaceOrInsert(order db.Order) { + o.mux.Lock() + defer o.mux.Unlock() + o.ReplaceOrInsert(order) +} + +func (o *OrderTree) Get(order db.Order) db.Order { + o.mux.RLock() + defer o.mux.RUnlock() + return o.Get(order) +} + +func (o *OrderTree) Delete(order db.Order) { + o.mux.Lock() + defer o.mux.Unlock() + o.Delete(order) +} + +func (o *OrderBook) Match(price pgtype.Numeric, priceTime pgtype.Timestamptz) []*db.Order { + o.stopBuy.mux.RLock() + o.limitBuy.mux.RLock() + o.stopSell.mux.RLock() + o.limitSell.mux.RLock() + defer func() { + o.stopBuy.mux.RUnlock() + o.limitBuy.mux.RUnlock() + o.stopSell.mux.RUnlock() + o.limitSell.mux.RUnlock() + }() + + var ( + stopBuyOrder *db.Order + limitSellOrder *db.Order + limitBuyOrder *db.Order + stopSellOrder *db.Order + matchedOrder *db.Order + + expected = db.Order{ + Price: price, + CreatedAt: priceTime, + } + ) + + orderBook.Type(db.OrderSideBUY, db.OrderConditionSTOP).AscendGreaterOrEqual(expected, func(order db.Order) bool { + stopBuyOrder = &order + return false + }) + + orderBook.Type(db.OrderSideSELL, db.OrderConditionLIMIT).AscendGreaterOrEqual(expected, func(order db.Order) bool { + limitSellOrder = &order + return false + }) + + orderBook.Type(db.OrderSideBUY, db.OrderConditionLIMIT).DescendLessOrEqual(expected, func(order db.Order) bool { + limitBuyOrder = &order + return false + }) + + orderBook.Type(db.OrderSideSELL, db.OrderConditionSTOP).DescendLessOrEqual(expected, func(order db.Order) bool { + stopSellOrder = &order + return false + }) + + matchedOrder = stopBuyOrder + if limitSellOrder != nil && limitSellOrder.CreatedAt.Time.Before(matchedOrder.CreatedAt.Time) { + matchedOrder = limitSellOrder + } + if limitBuyOrder != nil && limitBuyOrder.CreatedAt.Time.Before(matchedOrder.CreatedAt.Time) { + matchedOrder = limitBuyOrder + } + if stopSellOrder != nil && stopSellOrder.CreatedAt.Time.Before(matchedOrder.CreatedAt.Time) { + matchedOrder = stopSellOrder + } + + return []*db.Order{matchedOrder} +} diff --git a/pkg/db/migration/000002_create_orders_table.down.sql b/pkg/db/migration/000002_create_orders_table.down.sql new file mode 100644 index 0000000..30d483d --- /dev/null +++ b/pkg/db/migration/000002_create_orders_table.down.sql @@ -0,0 +1,5 @@ +DROP TABLE IF EXISTS orders; + +DROP TYPE IF EXISTS ORDER_STATUS; +DROP TYPE IF EXISTS ORDER_SIDE; +DROP TYPE IF EXISTS ORDER_TYPE; diff --git a/pkg/db/migration/000002_create_orders_table.up.sql b/pkg/db/migration/000002_create_orders_table.up.sql new file mode 100644 index 0000000..635e90e --- /dev/null +++ b/pkg/db/migration/000002_create_orders_table.up.sql @@ -0,0 +1,20 @@ +CREATE TYPE ORDER_STATUS AS ENUM ('PENDING', 'FILLED', 'CANCELED'); +CREATE TYPE ORDER_SIDE AS ENUM ('BUY', 'SELL'); +CREATE TYPE ORDER_CONDITION AS ENUM ('LIMIT', 'STOP'); + +CREATE TABLE IF NOT EXISTS orders ( + id BIGSERIAL PRIMARY KEY, + wallet VARCHAR(42), + from_token VARCHAR(10) NOT NULL, + to_token VARCHAR(10) NOT NULL, + status ORDER_STATUS NOT NULL DEFAULT 'PENDING', + side ORDER_SIDE NOT NULL, + condition ORDER_CONDITION NOT NULL, + price NUMERIC(78,18) NOT NULL, + filled_at TIMESTAMP WITH TIME ZONE, + cancelled_at TIMESTAMP WITH TIME ZONE, + created_at TIMESTAMP WITH TIME ZONE, + + FOREIGN KEY (from_token) REFERENCES tokens(id) ON DELETE CASCADE, + FOREIGN KEY (to_token) REFERENCES tokens(id) ON DELETE CASCADE +); diff --git a/pkg/db/models.go b/pkg/db/models.go index b378255..8c3bddb 100644 --- a/pkg/db/models.go +++ b/pkg/db/models.go @@ -5,9 +5,153 @@ package db import ( + "database/sql/driver" + "fmt" + "github.com/jackc/pgx/v5/pgtype" ) +type OrderCondition string + +const ( + OrderConditionLIMIT OrderCondition = "LIMIT" + OrderConditionSTOP OrderCondition = "STOP" +) + +func (e *OrderCondition) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = OrderCondition(s) + case string: + *e = OrderCondition(s) + default: + return fmt.Errorf("unsupported scan type for OrderCondition: %T", src) + } + return nil +} + +type NullOrderCondition struct { + OrderCondition OrderCondition `json:"order_condition"` + Valid bool `json:"valid"` // Valid is true if OrderCondition is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullOrderCondition) Scan(value interface{}) error { + if value == nil { + ns.OrderCondition, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.OrderCondition.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullOrderCondition) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.OrderCondition), nil +} + +type OrderSide string + +const ( + OrderSideBUY OrderSide = "BUY" + OrderSideSELL OrderSide = "SELL" +) + +func (e *OrderSide) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = OrderSide(s) + case string: + *e = OrderSide(s) + default: + return fmt.Errorf("unsupported scan type for OrderSide: %T", src) + } + return nil +} + +type NullOrderSide struct { + OrderSide OrderSide `json:"order_side"` + Valid bool `json:"valid"` // Valid is true if OrderSide is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullOrderSide) Scan(value interface{}) error { + if value == nil { + ns.OrderSide, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.OrderSide.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullOrderSide) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.OrderSide), nil +} + +type OrderStatus string + +const ( + OrderStatusPENDING OrderStatus = "PENDING" + OrderStatusFILLED OrderStatus = "FILLED" + OrderStatusCANCELED OrderStatus = "CANCELED" +) + +func (e *OrderStatus) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = OrderStatus(s) + case string: + *e = OrderStatus(s) + default: + return fmt.Errorf("unsupported scan type for OrderStatus: %T", src) + } + return nil +} + +type NullOrderStatus struct { + OrderStatus OrderStatus `json:"order_status"` + Valid bool `json:"valid"` // Valid is true if OrderStatus is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullOrderStatus) Scan(value interface{}) error { + if value == nil { + ns.OrderStatus, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.OrderStatus.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullOrderStatus) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.OrderStatus), nil +} + +type Order struct { + ID int64 `json:"id"` + Wallet pgtype.Text `json:"wallet"` + FromToken string `json:"from_token"` + ToToken string `json:"to_token"` + Status OrderStatus `json:"status"` + Side OrderSide `json:"side"` + Condition OrderCondition `json:"condition"` + Price pgtype.Numeric `json:"price"` + FilledAt pgtype.Timestamptz `json:"filled_at"` + CancelledAt pgtype.Timestamptz `json:"cancelled_at"` + CreatedAt pgtype.Timestamptz `json:"created_at"` +} + type Pool struct { ID string `json:"id"` Token0ID string `json:"token0_id"` diff --git a/pkg/db/orders.sql.go b/pkg/db/orders.sql.go new file mode 100644 index 0000000..f6b4233 --- /dev/null +++ b/pkg/db/orders.sql.go @@ -0,0 +1,100 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.28.0 +// source: orders.sql + +package db + +import ( + "context" + + "github.com/jackc/pgx/v5/pgtype" +) + +const getOrdersByWallet = `-- name: GetOrdersByWallet :many +SELECT id, wallet, from_token, to_token, status, side, condition, price, filled_at, cancelled_at, created_at FROM orders +WHERE wallet = $1 +ORDER BY created_at DESC +LIMIT $2 OFFSET $3 +` + +type GetOrdersByWalletParams struct { + Wallet pgtype.Text `json:"wallet"` + Limit int32 `json:"limit"` + Offset int32 `json:"offset"` +} + +func (q *Queries) GetOrdersByWallet(ctx context.Context, arg GetOrdersByWalletParams) ([]Order, error) { + rows, err := q.db.Query(ctx, getOrdersByWallet, arg.Wallet, arg.Limit, arg.Offset) + if err != nil { + return nil, err + } + defer rows.Close() + items := []Order{} + for rows.Next() { + var i Order + if err := rows.Scan( + &i.ID, + &i.Wallet, + &i.FromToken, + &i.ToToken, + &i.Status, + &i.Side, + &i.Condition, + &i.Price, + &i.FilledAt, + &i.CancelledAt, + &i.CreatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const insertOrder = `-- name: InsertOrder :one +INSERT INTO orders (wallet, from_token, to_token, side, condition, price, created_at) +VALUES ($1, $2, $3, $4, $5, $6, $7) +RETURNING id, wallet, from_token, to_token, status, side, condition, price, filled_at, cancelled_at, created_at +` + +type InsertOrderParams struct { + Wallet pgtype.Text `json:"wallet"` + FromToken string `json:"from_token"` + ToToken string `json:"to_token"` + Side OrderSide `json:"side"` + Condition OrderCondition `json:"condition"` + Price pgtype.Numeric `json:"price"` + CreatedAt pgtype.Timestamptz `json:"created_at"` +} + +func (q *Queries) InsertOrder(ctx context.Context, arg InsertOrderParams) (Order, error) { + row := q.db.QueryRow(ctx, insertOrder, + arg.Wallet, + arg.FromToken, + arg.ToToken, + arg.Side, + arg.Condition, + arg.Price, + arg.CreatedAt, + ) + var i Order + err := row.Scan( + &i.ID, + &i.Wallet, + &i.FromToken, + &i.ToToken, + &i.Status, + &i.Side, + &i.Condition, + &i.Price, + &i.FilledAt, + &i.CancelledAt, + &i.CreatedAt, + ) + return i, err +} diff --git a/pkg/db/querier.go b/pkg/db/querier.go index d62334b..38de318 100644 --- a/pkg/db/querier.go +++ b/pkg/db/querier.go @@ -13,10 +13,12 @@ type Querier interface { CreatePrice(ctx context.Context, arg CreatePriceParams) (Price, error) CreateToken(ctx context.Context, arg CreateTokenParams) (Token, error) GetMarketData(ctx context.Context, arg GetMarketDataParams) ([]GetMarketDataRow, error) + GetOrdersByWallet(ctx context.Context, arg GetOrdersByWalletParams) ([]Order, error) GetPool(ctx context.Context, id string) (Pool, error) GetPools(ctx context.Context) ([]Pool, error) GetPriceByPoolID(ctx context.Context, poolID string) (Price, error) GetPrices(ctx context.Context, arg GetPricesParams) ([]Price, error) + InsertOrder(ctx context.Context, arg InsertOrderParams) (Order, error) PoolDetails(ctx context.Context, id string) (PoolDetailsRow, error) } diff --git a/pkg/db/query/orders.sql b/pkg/db/query/orders.sql new file mode 100644 index 0000000..0ca94df --- /dev/null +++ b/pkg/db/query/orders.sql @@ -0,0 +1,10 @@ +-- name: InsertOrder :one +INSERT INTO orders (wallet, from_token, to_token, side, condition, price, created_at) +VALUES ($1, $2, $3, $4, $5, $6, $7) +RETURNING *; + +-- name: GetOrdersByWallet :many +SELECT * FROM orders +WHERE wallet = $1 +ORDER BY created_at DESC +LIMIT $2 OFFSET $3; diff --git a/pkg/db/query/prices.sql b/pkg/db/query/prices.sql index b390386..65feeda 100644 --- a/pkg/db/query/prices.sql +++ b/pkg/db/query/prices.sql @@ -8,7 +8,6 @@ SELECT * FROM prices ORDER BY created_at DESC LIMIT $1 OFFSET $2; - -- name: GetPriceByPoolID :one SELECT * FROM prices WHERE pool_id = $1 From 4aa91f6e69d2f18c69dd1c6a003229344eaa85bd Mon Sep 17 00:00:00 2001 From: TP-O Date: Mon, 24 Feb 2025 20:46:42 +0700 Subject: [PATCH 02/10] feat: create and list twap orders --- internal/orders/services/order.go | 47 +++++++++-- .../000002_create_orders_table.up.sql | 8 +- pkg/db/models.go | 34 ++++---- pkg/db/orders.sql.go | 77 +++++++++++++++---- pkg/db/querier.go | 2 +- pkg/db/query/orders.sql | 11 +-- 6 files changed, 133 insertions(+), 46 deletions(-) diff --git a/internal/orders/services/order.go b/internal/orders/services/order.go index 7d63b7a..35ad6e1 100644 --- a/internal/orders/services/order.go +++ b/internal/orders/services/order.go @@ -6,6 +6,7 @@ import ( "github.com/jinzhu/copier" "github.com/zuni-lab/dexon-service/pkg/custom" "github.com/zuni-lab/dexon-service/pkg/db" + "slices" "time" ) @@ -15,22 +16,52 @@ type ListOrdersByWalletQuery struct { Offset int32 `json:"offset"` } -func ListOrderByWallet(ctx context.Context, query ListOrdersByWalletQuery) ([]db.Order, error) { +type ListOrdersByWalletResponseItem struct { + db.Order + Children []db.Order `json:"children"` +} + +func ListOrderByWallet(ctx context.Context, query ListOrdersByWalletQuery) ([]ListOrdersByWalletResponseItem, error) { var params db.GetOrdersByWalletParams if err := copier.Copy(¶ms, &query); err != nil { return nil, err } - return db.DB.GetOrdersByWallet(ctx, params) + orders, err := db.DB.GetOrdersByWallet(ctx, params) + if err != nil { + return nil, err + } + + var ( + item ListOrdersByWalletResponseItem + res []ListOrdersByWalletResponseItem + ) + for _, order := range orders { + if idx := slices.IndexFunc(res, func(item ListOrdersByWalletResponseItem) bool { + return item.ID == order.ID + }); idx != -1 { + res[idx].Children = append(res[idx].Children, order.Order) + } + + err = copier.Copy(&item, &order) + if err != nil { + return nil, err + } + res = append(res, item) + } + + return res, nil } type CreateOrderBody struct { - Wallet string `json:"wallet" validate:"eth_addr"` - FromToken string `json:"from_token" validate:"eth_addr"` - ToToken string `json:"to_token" validate:"eth_addr"` - Side db.OrderSide `json:"side" validate:"oneof=BUY SELL"` - Condition db.OrderCondition `json:"condition" validate:"oneof=LIMIT STOP"` - Price string `json:"price" validate:"numeric,gt=0"` + Wallet string `json:"wallet" validate:"eth_addr"` + FromToken string `json:"from_token" validate:"eth_addr"` + ToToken string `json:"to_token" validate:"eth_addr"` + Side db.OrderSide `json:"side" validate:"oneof=BUY SELL"` + Condition db.OrderCondition `json:"condition" validate:"oneof=LIMIT STOP TWAP"` + Price string `json:"price" validate:"numeric,gt=0"` + Amount string `json:"amount" validate:"numeric,gt=0"` + TwapTotalTime *int32 `json:"twap_total_time" validate:"omitempty,gt=0"` } func CreateOrder(ctx context.Context, body CreateOrderBody) (*db.Order, error) { diff --git a/pkg/db/migration/000002_create_orders_table.up.sql b/pkg/db/migration/000002_create_orders_table.up.sql index 635e90e..fdad188 100644 --- a/pkg/db/migration/000002_create_orders_table.up.sql +++ b/pkg/db/migration/000002_create_orders_table.up.sql @@ -1,9 +1,10 @@ -CREATE TYPE ORDER_STATUS AS ENUM ('PENDING', 'FILLED', 'CANCELED'); +CREATE TYPE ORDER_STATUS AS ENUM ('PENDING', 'PARTIAL_FILLED' ,'FILLED', 'REJECTED', 'CANCELED'); CREATE TYPE ORDER_SIDE AS ENUM ('BUY', 'SELL'); -CREATE TYPE ORDER_CONDITION AS ENUM ('LIMIT', 'STOP'); +CREATE TYPE ORDER_CONDITION AS ENUM ('LIMIT', 'STOP', 'TWAP'); CREATE TABLE IF NOT EXISTS orders ( id BIGSERIAL PRIMARY KEY, + parent_id BIGINT, wallet VARCHAR(42), from_token VARCHAR(10) NOT NULL, to_token VARCHAR(10) NOT NULL, @@ -11,10 +12,13 @@ CREATE TABLE IF NOT EXISTS orders ( side ORDER_SIDE NOT NULL, condition ORDER_CONDITION NOT NULL, price NUMERIC(78,18) NOT NULL, + amount NUMERIC(78,18) NOT NULL, + twap_total_time INT, filled_at TIMESTAMP WITH TIME ZONE, cancelled_at TIMESTAMP WITH TIME ZONE, created_at TIMESTAMP WITH TIME ZONE, + FOREIGN KEY (parent_id) REFERENCES orders(id) ON DELETE CASCADE, FOREIGN KEY (from_token) REFERENCES tokens(id) ON DELETE CASCADE, FOREIGN KEY (to_token) REFERENCES tokens(id) ON DELETE CASCADE ); diff --git a/pkg/db/models.go b/pkg/db/models.go index 8c3bddb..6cb0800 100644 --- a/pkg/db/models.go +++ b/pkg/db/models.go @@ -16,6 +16,7 @@ type OrderCondition string const ( OrderConditionLIMIT OrderCondition = "LIMIT" OrderConditionSTOP OrderCondition = "STOP" + OrderConditionTWAP OrderCondition = "TWAP" ) func (e *OrderCondition) Scan(src interface{}) error { @@ -98,9 +99,11 @@ func (ns NullOrderSide) Value() (driver.Value, error) { type OrderStatus string const ( - OrderStatusPENDING OrderStatus = "PENDING" - OrderStatusFILLED OrderStatus = "FILLED" - OrderStatusCANCELED OrderStatus = "CANCELED" + OrderStatusPENDING OrderStatus = "PENDING" + OrderStatusPARTIALFILLED OrderStatus = "PARTIAL_FILLED" + OrderStatusFILLED OrderStatus = "FILLED" + OrderStatusREJECTED OrderStatus = "REJECTED" + OrderStatusCANCELED OrderStatus = "CANCELED" ) func (e *OrderStatus) Scan(src interface{}) error { @@ -139,17 +142,20 @@ func (ns NullOrderStatus) Value() (driver.Value, error) { } type Order struct { - ID int64 `json:"id"` - Wallet pgtype.Text `json:"wallet"` - FromToken string `json:"from_token"` - ToToken string `json:"to_token"` - Status OrderStatus `json:"status"` - Side OrderSide `json:"side"` - Condition OrderCondition `json:"condition"` - Price pgtype.Numeric `json:"price"` - FilledAt pgtype.Timestamptz `json:"filled_at"` - CancelledAt pgtype.Timestamptz `json:"cancelled_at"` - CreatedAt pgtype.Timestamptz `json:"created_at"` + ID int64 `json:"id"` + ParentID pgtype.Int8 `json:"parent_id"` + Wallet pgtype.Text `json:"wallet"` + FromToken string `json:"from_token"` + ToToken string `json:"to_token"` + Status OrderStatus `json:"status"` + Side OrderSide `json:"side"` + Condition OrderCondition `json:"condition"` + Price pgtype.Numeric `json:"price"` + Amount pgtype.Numeric `json:"amount"` + TwapTotalTime pgtype.Int4 `json:"twap_total_time"` + FilledAt pgtype.Timestamptz `json:"filled_at"` + CancelledAt pgtype.Timestamptz `json:"cancelled_at"` + CreatedAt pgtype.Timestamptz `json:"created_at"` } type Pool struct { diff --git a/pkg/db/orders.sql.go b/pkg/db/orders.sql.go index f6b4233..9bdb345 100644 --- a/pkg/db/orders.sql.go +++ b/pkg/db/orders.sql.go @@ -12,9 +12,10 @@ import ( ) const getOrdersByWallet = `-- name: GetOrdersByWallet :many -SELECT id, wallet, from_token, to_token, status, side, condition, price, filled_at, cancelled_at, created_at FROM orders -WHERE wallet = $1 -ORDER BY created_at DESC +SELECT o1.id, o1.parent_id, o1.wallet, o1.from_token, o1.to_token, o1.status, o1.side, o1.condition, o1.price, o1.amount, o1.twap_total_time, o1.filled_at, o1.cancelled_at, o1.created_at, o2.id, o2.parent_id, o2.wallet, o2.from_token, o2.to_token, o2.status, o2.side, o2.condition, o2.price, o2.amount, o2.twap_total_time, o2.filled_at, o2.cancelled_at, o2.created_at FROM orders AS o1 +LEFT JOIN orders AS o2 ON o1.id = o2.parent_id AND o2.parent_id IS NOT NULL +WHERE o1.wallet = $1 +ORDER BY o1.created_at DESC LIMIT $2 OFFSET $3 ` @@ -24,17 +25,36 @@ type GetOrdersByWalletParams struct { Offset int32 `json:"offset"` } -func (q *Queries) GetOrdersByWallet(ctx context.Context, arg GetOrdersByWalletParams) ([]Order, error) { +type GetOrdersByWalletRow struct { + ID int64 `json:"id"` + ParentID pgtype.Int8 `json:"parent_id"` + Wallet pgtype.Text `json:"wallet"` + FromToken string `json:"from_token"` + ToToken string `json:"to_token"` + Status OrderStatus `json:"status"` + Side OrderSide `json:"side"` + Condition OrderCondition `json:"condition"` + Price pgtype.Numeric `json:"price"` + Amount pgtype.Numeric `json:"amount"` + TwapTotalTime pgtype.Int4 `json:"twap_total_time"` + FilledAt pgtype.Timestamptz `json:"filled_at"` + CancelledAt pgtype.Timestamptz `json:"cancelled_at"` + CreatedAt pgtype.Timestamptz `json:"created_at"` + Order Order `json:"order"` +} + +func (q *Queries) GetOrdersByWallet(ctx context.Context, arg GetOrdersByWalletParams) ([]GetOrdersByWalletRow, error) { rows, err := q.db.Query(ctx, getOrdersByWallet, arg.Wallet, arg.Limit, arg.Offset) if err != nil { return nil, err } defer rows.Close() - items := []Order{} + items := []GetOrdersByWalletRow{} for rows.Next() { - var i Order + var i GetOrdersByWalletRow if err := rows.Scan( &i.ID, + &i.ParentID, &i.Wallet, &i.FromToken, &i.ToToken, @@ -42,9 +62,25 @@ func (q *Queries) GetOrdersByWallet(ctx context.Context, arg GetOrdersByWalletPa &i.Side, &i.Condition, &i.Price, + &i.Amount, + &i.TwapTotalTime, &i.FilledAt, &i.CancelledAt, &i.CreatedAt, + &i.Order.ID, + &i.Order.ParentID, + &i.Order.Wallet, + &i.Order.FromToken, + &i.Order.ToToken, + &i.Order.Status, + &i.Order.Side, + &i.Order.Condition, + &i.Order.Price, + &i.Order.Amount, + &i.Order.TwapTotalTime, + &i.Order.FilledAt, + &i.Order.CancelledAt, + &i.Order.CreatedAt, ); err != nil { return nil, err } @@ -57,34 +93,41 @@ func (q *Queries) GetOrdersByWallet(ctx context.Context, arg GetOrdersByWalletPa } const insertOrder = `-- name: InsertOrder :one -INSERT INTO orders (wallet, from_token, to_token, side, condition, price, created_at) -VALUES ($1, $2, $3, $4, $5, $6, $7) -RETURNING id, wallet, from_token, to_token, status, side, condition, price, filled_at, cancelled_at, created_at +INSERT INTO orders (parent_id, wallet, from_token, to_token, side, condition, price, amount, twap_total_time, created_at) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) +RETURNING id, parent_id, wallet, from_token, to_token, status, side, condition, price, amount, twap_total_time, filled_at, cancelled_at, created_at ` type InsertOrderParams struct { - Wallet pgtype.Text `json:"wallet"` - FromToken string `json:"from_token"` - ToToken string `json:"to_token"` - Side OrderSide `json:"side"` - Condition OrderCondition `json:"condition"` - Price pgtype.Numeric `json:"price"` - CreatedAt pgtype.Timestamptz `json:"created_at"` + ParentID pgtype.Int8 `json:"parent_id"` + Wallet pgtype.Text `json:"wallet"` + FromToken string `json:"from_token"` + ToToken string `json:"to_token"` + Side OrderSide `json:"side"` + Condition OrderCondition `json:"condition"` + Price pgtype.Numeric `json:"price"` + Amount pgtype.Numeric `json:"amount"` + TwapTotalTime pgtype.Int4 `json:"twap_total_time"` + CreatedAt pgtype.Timestamptz `json:"created_at"` } func (q *Queries) InsertOrder(ctx context.Context, arg InsertOrderParams) (Order, error) { row := q.db.QueryRow(ctx, insertOrder, + arg.ParentID, arg.Wallet, arg.FromToken, arg.ToToken, arg.Side, arg.Condition, arg.Price, + arg.Amount, + arg.TwapTotalTime, arg.CreatedAt, ) var i Order err := row.Scan( &i.ID, + &i.ParentID, &i.Wallet, &i.FromToken, &i.ToToken, @@ -92,6 +135,8 @@ func (q *Queries) InsertOrder(ctx context.Context, arg InsertOrderParams) (Order &i.Side, &i.Condition, &i.Price, + &i.Amount, + &i.TwapTotalTime, &i.FilledAt, &i.CancelledAt, &i.CreatedAt, diff --git a/pkg/db/querier.go b/pkg/db/querier.go index 38de318..b9e0cea 100644 --- a/pkg/db/querier.go +++ b/pkg/db/querier.go @@ -13,7 +13,7 @@ type Querier interface { CreatePrice(ctx context.Context, arg CreatePriceParams) (Price, error) CreateToken(ctx context.Context, arg CreateTokenParams) (Token, error) GetMarketData(ctx context.Context, arg GetMarketDataParams) ([]GetMarketDataRow, error) - GetOrdersByWallet(ctx context.Context, arg GetOrdersByWalletParams) ([]Order, error) + GetOrdersByWallet(ctx context.Context, arg GetOrdersByWalletParams) ([]GetOrdersByWalletRow, error) GetPool(ctx context.Context, id string) (Pool, error) GetPools(ctx context.Context) ([]Pool, error) GetPriceByPoolID(ctx context.Context, poolID string) (Price, error) diff --git a/pkg/db/query/orders.sql b/pkg/db/query/orders.sql index 0ca94df..a34d91f 100644 --- a/pkg/db/query/orders.sql +++ b/pkg/db/query/orders.sql @@ -1,10 +1,11 @@ -- name: InsertOrder :one -INSERT INTO orders (wallet, from_token, to_token, side, condition, price, created_at) -VALUES ($1, $2, $3, $4, $5, $6, $7) +INSERT INTO orders (parent_id, wallet, from_token, to_token, side, condition, price, amount, twap_total_time, created_at) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING *; -- name: GetOrdersByWallet :many -SELECT * FROM orders -WHERE wallet = $1 -ORDER BY created_at DESC +SELECT o1.*, sqlc.embed(o2) FROM orders AS o1 +LEFT JOIN orders AS o2 ON o1.id = o2.parent_id AND o2.parent_id IS NOT NULL +WHERE o1.wallet = $1 +ORDER BY o1.created_at DESC LIMIT $2 OFFSET $3; From ddc3e617aedd8c0ffe92ea317a37f0020473e3b8 Mon Sep 17 00:00:00 2001 From: TP-O Date: Mon, 24 Feb 2025 20:56:33 +0700 Subject: [PATCH 03/10] feat: add fill parital order fnc --- internal/orders/services/order.go | 27 +++++++++++++++++++++++---- pkg/db/orders.sql.go | 10 ++++++++-- pkg/db/query/orders.sql | 4 ++-- 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/internal/orders/services/order.go b/internal/orders/services/order.go index 35ad6e1..a31e238 100644 --- a/internal/orders/services/order.go +++ b/internal/orders/services/order.go @@ -2,7 +2,6 @@ package services import ( "context" - "github.com/jackc/pgx/v5/pgtype" "github.com/jinzhu/copier" "github.com/zuni-lab/dexon-service/pkg/custom" "github.com/zuni-lab/dexon-service/pkg/db" @@ -65,9 +64,7 @@ type CreateOrderBody struct { } func CreateOrder(ctx context.Context, body CreateOrderBody) (*db.Order, error) { - params := db.InsertOrderParams{ - CreatedAt: pgtype.Timestamptz{Time: time.Now(), Valid: true}, - } + var params db.InsertOrderParams if err := copier.Copy(¶ms, &body); err != nil { return nil, err } @@ -81,3 +78,25 @@ func CreateOrder(ctx context.Context, body CreateOrderBody) (*db.Order, error) { orderBook.Type(order.Side, order.Condition).ReplaceOrInsert(order) return &order, nil } + +func FillPartialOrder(ctx context.Context, parent db.Order, price, amount string) (*db.Order, error) { + var params db.InsertOrderParams + if err := copier.Copy(¶ms, &parent); err != nil { + return nil, err + } + + _ = params.ParentID.Scan(parent.ID) + _ = params.CreatedAt.Scan(time.Now()) + _ = params.Price.Scan(price) + _ = params.Amount.Scan(amount) + params.TwapTotalTime.Valid = false + params.Status = db.OrderStatusFILLED + params.FilledAt = params.CreatedAt + + order, err := db.DB.InsertOrder(ctx, params) + if err != nil { + return nil, err + } + + return &order, nil +} diff --git a/pkg/db/orders.sql.go b/pkg/db/orders.sql.go index 9bdb345..1c3d3b1 100644 --- a/pkg/db/orders.sql.go +++ b/pkg/db/orders.sql.go @@ -93,8 +93,8 @@ func (q *Queries) GetOrdersByWallet(ctx context.Context, arg GetOrdersByWalletPa } const insertOrder = `-- name: InsertOrder :one -INSERT INTO orders (parent_id, wallet, from_token, to_token, side, condition, price, amount, twap_total_time, created_at) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) +INSERT INTO orders (parent_id, wallet, from_token, to_token, side, status,condition, price, amount, twap_total_time, filled_at, cancelled_at, created_at) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING id, parent_id, wallet, from_token, to_token, status, side, condition, price, amount, twap_total_time, filled_at, cancelled_at, created_at ` @@ -104,10 +104,13 @@ type InsertOrderParams struct { FromToken string `json:"from_token"` ToToken string `json:"to_token"` Side OrderSide `json:"side"` + Status OrderStatus `json:"status"` Condition OrderCondition `json:"condition"` Price pgtype.Numeric `json:"price"` Amount pgtype.Numeric `json:"amount"` TwapTotalTime pgtype.Int4 `json:"twap_total_time"` + FilledAt pgtype.Timestamptz `json:"filled_at"` + CancelledAt pgtype.Timestamptz `json:"cancelled_at"` CreatedAt pgtype.Timestamptz `json:"created_at"` } @@ -118,10 +121,13 @@ func (q *Queries) InsertOrder(ctx context.Context, arg InsertOrderParams) (Order arg.FromToken, arg.ToToken, arg.Side, + arg.Status, arg.Condition, arg.Price, arg.Amount, arg.TwapTotalTime, + arg.FilledAt, + arg.CancelledAt, arg.CreatedAt, ) var i Order diff --git a/pkg/db/query/orders.sql b/pkg/db/query/orders.sql index a34d91f..b79c9ff 100644 --- a/pkg/db/query/orders.sql +++ b/pkg/db/query/orders.sql @@ -1,6 +1,6 @@ -- name: InsertOrder :one -INSERT INTO orders (parent_id, wallet, from_token, to_token, side, condition, price, amount, twap_total_time, created_at) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) +INSERT INTO orders (parent_id, wallet, from_token, to_token, side, status,condition, price, amount, twap_total_time, filled_at, cancelled_at, created_at) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING *; -- name: GetOrdersByWallet :many From 0af5f577f18204f87008a09f747a037a6ecd1e9f Mon Sep 17 00:00:00 2001 From: TP-O Date: Tue, 25 Feb 2025 21:40:43 +0700 Subject: [PATCH 04/10] feat: update order schema --- cmd/worker/job/price_tracker.go | 229 ++++++++++++++++++ internal/orders/services/order.go | 33 ++- pkg/custom/order.go | 18 +- .../000002_create_orders_table.up.sql | 12 +- pkg/db/models.go | 92 +++---- pkg/db/orders.sql.go | 38 ++- pkg/db/pools.sql.go | 22 ++ pkg/db/querier.go | 1 + pkg/db/query/orders.sql | 4 +- pkg/db/query/pools.sql | 4 + 10 files changed, 357 insertions(+), 96 deletions(-) diff --git a/cmd/worker/job/price_tracker.go b/cmd/worker/job/price_tracker.go index e69de29..0d7bcdf 100644 --- a/cmd/worker/job/price_tracker.go +++ b/cmd/worker/job/price_tracker.go @@ -0,0 +1,229 @@ +package job + +import ( + "context" + "fmt" + "github.com/jackc/pgx/v5/pgtype" + "github.com/zuni-lab/dexon-service/pkg/custom" + "github.com/zuni-lab/dexon-service/pkg/utils" + "math" + "math/big" + "strings" + "sync" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/rs/zerolog/log" + "github.com/zuni-lab/dexon-service/config" + "github.com/zuni-lab/dexon-service/pkg/evm" +) + +type PriceTracker struct { + client *ethclient.Client + backoff *backoff.ExponentialBackOff + maxAttempts uint64 +} + +func NewPriceTracker() *PriceTracker { + b := backoff.NewExponentialBackOff() + b.InitialInterval = 1 * time.Second + b.MaxInterval = 1 * time.Minute + b.MaxElapsedTime = 30 * time.Minute + + return &PriceTracker{ + backoff: b, + maxAttempts: 100, + } +} + +func (p *PriceTracker) Start(ctx context.Context) error { + for { + if err := p.connect(); err != nil { + log.Error().Err(err).Msg("Failed to connect to Ethereum client") + continue + } + + if err := p.WatchPools(ctx); err != nil { + log.Error().Err(err).Msg("Error watching pools") + if p.client != nil { + p.client.Close() + p.client = nil + } + continue + } + + return nil + } +} + +func (p *PriceTracker) connect() error { + url := strings.Replace(config.Env.AlchemyUrl, "https", "wss", 1) + + var client *ethclient.Client + var err error + + operation := func() error { + log.Info().Msg("Attempting to connect to Ethereum client...") + client, err = ethclient.Dial(url) + if err != nil { + return fmt.Errorf("failed to connect: %w", err) + } + return nil + } + + if err := backoff.Retry(operation, p.backoff); err != nil { + return err + } + + p.client = client + return nil +} + +func (p *PriceTracker) WatchPools(ctx context.Context) error { + pools := []common.Address{ + common.HexToAddress("0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640"), + common.HexToAddress("0xc7bbec68d12a0d1830360f8ec58fa599ba1b0e9b"), + } + + errChan := make(chan error, len(pools)) + var wg sync.WaitGroup + + for _, pool := range pools { + wg.Add(1) + go func(pool common.Address) { + defer wg.Done() + if err := p.WatchPool(ctx, pool); err != nil { + select { + case errChan <- fmt.Errorf("pool %s: %w", pool.Hex(), err): + default: + } + } + }(pool) + } + + go func() { + wg.Wait() + close(errChan) + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-errChan: + return err + } +} + +func (p *PriceTracker) WatchPool(ctx context.Context, pool common.Address) error { + for { + if err := p.watchPoolWithRetry(ctx, pool); err != nil { + if ctx.Err() != nil { + return ctx.Err() + } + log.Error(). + Err(err). + Str("pool", pool.Hex()). + Msg("Error watching pool, will retry") + continue + } + return nil + } +} + +func (p *PriceTracker) watchPoolWithRetry(ctx context.Context, pool common.Address) error { + log.Info().Msgf("Watching pool %s", pool.Hex()) + attempt := 0 +RETRY: + for attempt < int(p.maxAttempts) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + contract, err := evm.NewUniswapV3(pool, p.client) + if err != nil { + return fmt.Errorf("failed to create contract instance: %w", err) + } + + watchOpts := &bind.WatchOpts{Context: ctx} + sink := make(chan *evm.UniswapV3Swap) + + sub, err := contract.WatchSwap(watchOpts, sink, nil, nil) + if err != nil { + attempt++ + nextBackoff := p.backoff.NextBackOff() + if nextBackoff == backoff.Stop { + return fmt.Errorf("max elapsed time reached after %d attempts", attempt) + } + log.Warn(). + Err(err). + Int("attempt", attempt). + Dur("backoff", nextBackoff). + Str("pool", pool.Hex()). + Msg("Failed to watch swaps, retrying...") + time.Sleep(nextBackoff) + continue + } + defer sub.Unsubscribe() + + attempt = 0 + p.backoff.Reset() + + log.Info().Msg("🚀 Ready to watch pool") + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-sub.Err(): + if err != nil { + log.Warn(). + Err(err). + Str("pool", pool.Hex()). + Msg("Subscription error, reconnecting...") + goto RETRY + } + case event := <-sink: + if err := p.handleEvent(event); err != nil { + log.Error(). + Err(err). + Str("pool", pool.Hex()). + Msg("Error handling event") + } + } + } + } + + return fmt.Errorf("failed to maintain subscription after %d attempts", p.maxAttempts) +} + +func (p *PriceTracker) handleEvent(event *evm.UniswapV3Swap) error { + price := utils.CalculatePrice(event.SqrtPriceX96) + priceInt := new(big.Int) + price.Int(priceInt) + priceExp2 := new(big.Float).MantExp(price) + priceExp10 := int32(math.Floor(float64(priceExp2) * math.Log10(2))) + + orderBook := custom.GetOrderBook() + matchedOrders := orderBook.Match( + pgtype.Numeric{ + Int: priceInt, + Exp: priceExp10, + Valid: true, + }, + pgtype.Timestamptz{ + Time: time.Now(), + }, + ) + + for _, order := range matchedOrders { + orderBook.Sub(order.Side, order.Type).Delete(*order) + } + + log.Info().Any("event", event).Msgf("[PriceTracker] [HandleEvent] handled %s event", event.Raw.Address.Hex()) + return nil +} diff --git a/internal/orders/services/order.go b/internal/orders/services/order.go index a31e238..cfce8f6 100644 --- a/internal/orders/services/order.go +++ b/internal/orders/services/order.go @@ -53,18 +53,31 @@ func ListOrderByWallet(ctx context.Context, query ListOrdersByWalletQuery) ([]Li } type CreateOrderBody struct { - Wallet string `json:"wallet" validate:"eth_addr"` - FromToken string `json:"from_token" validate:"eth_addr"` - ToToken string `json:"to_token" validate:"eth_addr"` - Side db.OrderSide `json:"side" validate:"oneof=BUY SELL"` - Condition db.OrderCondition `json:"condition" validate:"oneof=LIMIT STOP TWAP"` - Price string `json:"price" validate:"numeric,gt=0"` - Amount string `json:"amount" validate:"numeric,gt=0"` - TwapTotalTime *int32 `json:"twap_total_time" validate:"omitempty,gt=0"` + Wallet string `json:"wallet" validate:"eth_addr"` + Token0 string `json:"token0" validate:"eth_addr"` + Token1 string `json:"token1" validate:"eth_addr"` + Side db.OrderSide `json:"side" validate:"oneof=BUY SELL"` + Type db.OrderType `json:"type" validate:"oneof=MARKET LIMIT STOP TWAP"` + Price string `json:"price" validate:"numeric,gt=0"` + Amount string `json:"amount" validate:"numeric,gt=0"` + TwapTotalTime *int32 `json:"twap_total_time" validate:"omitempty,gt=0"` } func CreateOrder(ctx context.Context, body CreateOrderBody) (*db.Order, error) { - var params db.InsertOrderParams + var ( + pool db.Pool + params db.InsertOrderParams + ) + + pool, err := db.DB.GetPoolByToken(ctx, db.GetPoolByTokenParams{ + Token0ID: body.Token0, + Token1ID: body.Token1, + }) + if err != nil { + return nil, err + } + + params.PoolID = pool.ID if err := copier.Copy(¶ms, &body); err != nil { return nil, err } @@ -75,7 +88,7 @@ func CreateOrder(ctx context.Context, body CreateOrderBody) (*db.Order, error) { } orderBook := custom.GetOrderBook() - orderBook.Type(order.Side, order.Condition).ReplaceOrInsert(order) + orderBook.Sub(order.Side, order.Type).ReplaceOrInsert(order) return &order, nil } diff --git a/pkg/custom/order.go b/pkg/custom/order.go index 5fcade8..4be7d51 100644 --- a/pkg/custom/order.go +++ b/pkg/custom/order.go @@ -52,14 +52,14 @@ func newOrderTree() *OrderTree { } } -func (o *OrderBook) Type(side db.OrderSide, condition db.OrderCondition) *OrderTree { - if side == db.OrderSideBUY && condition == db.OrderConditionLIMIT { +func (o *OrderBook) Sub(side db.OrderSide, oType db.OrderType) *OrderTree { + if side == db.OrderSideBUY && oType == db.OrderTypeLIMIT { return o.limitBuy - } else if side == db.OrderSideBUY && condition == db.OrderConditionSTOP { + } else if side == db.OrderSideBUY && oType == db.OrderTypeSTOP { return o.stopBuy - } else if side == db.OrderSideSELL && condition == db.OrderConditionLIMIT { + } else if side == db.OrderSideSELL && oType == db.OrderTypeLIMIT { return o.limitSell - } else if side == db.OrderSideSELL && condition == db.OrderConditionSTOP { + } else if side == db.OrderSideSELL && oType == db.OrderTypeSTOP { return o.stopSell } else { return nil @@ -109,22 +109,22 @@ func (o *OrderBook) Match(price pgtype.Numeric, priceTime pgtype.Timestamptz) [] } ) - orderBook.Type(db.OrderSideBUY, db.OrderConditionSTOP).AscendGreaterOrEqual(expected, func(order db.Order) bool { + orderBook.Type(db.OrderSideBUY, db.OrderTypeSTOP).AscendGreaterOrEqual(expected, func(order db.Order) bool { stopBuyOrder = &order return false }) - orderBook.Type(db.OrderSideSELL, db.OrderConditionLIMIT).AscendGreaterOrEqual(expected, func(order db.Order) bool { + orderBook.Type(db.OrderSideSELL, db.OrderTypeLIMIT).AscendGreaterOrEqual(expected, func(order db.Order) bool { limitSellOrder = &order return false }) - orderBook.Type(db.OrderSideBUY, db.OrderConditionLIMIT).DescendLessOrEqual(expected, func(order db.Order) bool { + orderBook.Type(db.OrderSideBUY, db.OrderTypeLIMIT).DescendLessOrEqual(expected, func(order db.Order) bool { limitBuyOrder = &order return false }) - orderBook.Type(db.OrderSideSELL, db.OrderConditionSTOP).DescendLessOrEqual(expected, func(order db.Order) bool { + orderBook.Type(db.OrderSideSELL, db.OrderTypeSTOP).DescendLessOrEqual(expected, func(order db.Order) bool { stopSellOrder = &order return false }) diff --git a/pkg/db/migration/000002_create_orders_table.up.sql b/pkg/db/migration/000002_create_orders_table.up.sql index fdad188..2cc1cc4 100644 --- a/pkg/db/migration/000002_create_orders_table.up.sql +++ b/pkg/db/migration/000002_create_orders_table.up.sql @@ -1,16 +1,15 @@ CREATE TYPE ORDER_STATUS AS ENUM ('PENDING', 'PARTIAL_FILLED' ,'FILLED', 'REJECTED', 'CANCELED'); CREATE TYPE ORDER_SIDE AS ENUM ('BUY', 'SELL'); -CREATE TYPE ORDER_CONDITION AS ENUM ('LIMIT', 'STOP', 'TWAP'); +CREATE TYPE ORDER_TYPE AS ENUM ('MARKET', 'LIMIT', 'STOP', 'TWAP'); CREATE TABLE IF NOT EXISTS orders ( id BIGSERIAL PRIMARY KEY, + pool_id VARCHAR(42) NOT NULL, parent_id BIGINT, wallet VARCHAR(42), - from_token VARCHAR(10) NOT NULL, - to_token VARCHAR(10) NOT NULL, status ORDER_STATUS NOT NULL DEFAULT 'PENDING', side ORDER_SIDE NOT NULL, - condition ORDER_CONDITION NOT NULL, + type ORDER_TYPE NOT NULL, price NUMERIC(78,18) NOT NULL, amount NUMERIC(78,18) NOT NULL, twap_total_time INT, @@ -18,7 +17,6 @@ CREATE TABLE IF NOT EXISTS orders ( cancelled_at TIMESTAMP WITH TIME ZONE, created_at TIMESTAMP WITH TIME ZONE, - FOREIGN KEY (parent_id) REFERENCES orders(id) ON DELETE CASCADE, - FOREIGN KEY (from_token) REFERENCES tokens(id) ON DELETE CASCADE, - FOREIGN KEY (to_token) REFERENCES tokens(id) ON DELETE CASCADE + FOREIGN KEY (pool_id) REFERENCES pools(id) ON DELETE CASCADE, + FOREIGN KEY (parent_id) REFERENCES orders(id) ON DELETE CASCADE ); diff --git a/pkg/db/models.go b/pkg/db/models.go index 6cb0800..6e7f417 100644 --- a/pkg/db/models.go +++ b/pkg/db/models.go @@ -11,49 +11,6 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) -type OrderCondition string - -const ( - OrderConditionLIMIT OrderCondition = "LIMIT" - OrderConditionSTOP OrderCondition = "STOP" - OrderConditionTWAP OrderCondition = "TWAP" -) - -func (e *OrderCondition) Scan(src interface{}) error { - switch s := src.(type) { - case []byte: - *e = OrderCondition(s) - case string: - *e = OrderCondition(s) - default: - return fmt.Errorf("unsupported scan type for OrderCondition: %T", src) - } - return nil -} - -type NullOrderCondition struct { - OrderCondition OrderCondition `json:"order_condition"` - Valid bool `json:"valid"` // Valid is true if OrderCondition is not NULL -} - -// Scan implements the Scanner interface. -func (ns *NullOrderCondition) Scan(value interface{}) error { - if value == nil { - ns.OrderCondition, ns.Valid = "", false - return nil - } - ns.Valid = true - return ns.OrderCondition.Scan(value) -} - -// Value implements the driver Valuer interface. -func (ns NullOrderCondition) Value() (driver.Value, error) { - if !ns.Valid { - return nil, nil - } - return string(ns.OrderCondition), nil -} - type OrderSide string const ( @@ -141,15 +98,58 @@ func (ns NullOrderStatus) Value() (driver.Value, error) { return string(ns.OrderStatus), nil } +type OrderType string + +const ( + OrderTypeMARKET OrderType = "MARKET" + OrderTypeLIMIT OrderType = "LIMIT" + OrderTypeSTOP OrderType = "STOP" + OrderTypeTWAP OrderType = "TWAP" +) + +func (e *OrderType) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = OrderType(s) + case string: + *e = OrderType(s) + default: + return fmt.Errorf("unsupported scan type for OrderType: %T", src) + } + return nil +} + +type NullOrderType struct { + OrderType OrderType `json:"order_type"` + Valid bool `json:"valid"` // Valid is true if OrderType is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullOrderType) Scan(value interface{}) error { + if value == nil { + ns.OrderType, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.OrderType.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullOrderType) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.OrderType), nil +} + type Order struct { ID int64 `json:"id"` + PoolID string `json:"pool_id"` ParentID pgtype.Int8 `json:"parent_id"` Wallet pgtype.Text `json:"wallet"` - FromToken string `json:"from_token"` - ToToken string `json:"to_token"` Status OrderStatus `json:"status"` Side OrderSide `json:"side"` - Condition OrderCondition `json:"condition"` + Type OrderType `json:"type"` Price pgtype.Numeric `json:"price"` Amount pgtype.Numeric `json:"amount"` TwapTotalTime pgtype.Int4 `json:"twap_total_time"` diff --git a/pkg/db/orders.sql.go b/pkg/db/orders.sql.go index 1c3d3b1..474f342 100644 --- a/pkg/db/orders.sql.go +++ b/pkg/db/orders.sql.go @@ -12,7 +12,7 @@ import ( ) const getOrdersByWallet = `-- name: GetOrdersByWallet :many -SELECT o1.id, o1.parent_id, o1.wallet, o1.from_token, o1.to_token, o1.status, o1.side, o1.condition, o1.price, o1.amount, o1.twap_total_time, o1.filled_at, o1.cancelled_at, o1.created_at, o2.id, o2.parent_id, o2.wallet, o2.from_token, o2.to_token, o2.status, o2.side, o2.condition, o2.price, o2.amount, o2.twap_total_time, o2.filled_at, o2.cancelled_at, o2.created_at FROM orders AS o1 +SELECT o1.id, o1.pool_id, o1.parent_id, o1.wallet, o1.status, o1.side, o1.type, o1.price, o1.amount, o1.twap_total_time, o1.filled_at, o1.cancelled_at, o1.created_at, o2.id, o2.pool_id, o2.parent_id, o2.wallet, o2.status, o2.side, o2.type, o2.price, o2.amount, o2.twap_total_time, o2.filled_at, o2.cancelled_at, o2.created_at FROM orders AS o1 LEFT JOIN orders AS o2 ON o1.id = o2.parent_id AND o2.parent_id IS NOT NULL WHERE o1.wallet = $1 ORDER BY o1.created_at DESC @@ -27,13 +27,12 @@ type GetOrdersByWalletParams struct { type GetOrdersByWalletRow struct { ID int64 `json:"id"` + PoolID string `json:"pool_id"` ParentID pgtype.Int8 `json:"parent_id"` Wallet pgtype.Text `json:"wallet"` - FromToken string `json:"from_token"` - ToToken string `json:"to_token"` Status OrderStatus `json:"status"` Side OrderSide `json:"side"` - Condition OrderCondition `json:"condition"` + Type OrderType `json:"type"` Price pgtype.Numeric `json:"price"` Amount pgtype.Numeric `json:"amount"` TwapTotalTime pgtype.Int4 `json:"twap_total_time"` @@ -54,13 +53,12 @@ func (q *Queries) GetOrdersByWallet(ctx context.Context, arg GetOrdersByWalletPa var i GetOrdersByWalletRow if err := rows.Scan( &i.ID, + &i.PoolID, &i.ParentID, &i.Wallet, - &i.FromToken, - &i.ToToken, &i.Status, &i.Side, - &i.Condition, + &i.Type, &i.Price, &i.Amount, &i.TwapTotalTime, @@ -68,13 +66,12 @@ func (q *Queries) GetOrdersByWallet(ctx context.Context, arg GetOrdersByWalletPa &i.CancelledAt, &i.CreatedAt, &i.Order.ID, + &i.Order.PoolID, &i.Order.ParentID, &i.Order.Wallet, - &i.Order.FromToken, - &i.Order.ToToken, &i.Order.Status, &i.Order.Side, - &i.Order.Condition, + &i.Order.Type, &i.Order.Price, &i.Order.Amount, &i.Order.TwapTotalTime, @@ -93,19 +90,18 @@ func (q *Queries) GetOrdersByWallet(ctx context.Context, arg GetOrdersByWalletPa } const insertOrder = `-- name: InsertOrder :one -INSERT INTO orders (parent_id, wallet, from_token, to_token, side, status,condition, price, amount, twap_total_time, filled_at, cancelled_at, created_at) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) -RETURNING id, parent_id, wallet, from_token, to_token, status, side, condition, price, amount, twap_total_time, filled_at, cancelled_at, created_at +INSERT INTO orders (parent_id, wallet, pool_id, side, status, type, price, amount, twap_total_time, filled_at, cancelled_at, created_at) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) +RETURNING id, pool_id, parent_id, wallet, status, side, type, price, amount, twap_total_time, filled_at, cancelled_at, created_at ` type InsertOrderParams struct { ParentID pgtype.Int8 `json:"parent_id"` Wallet pgtype.Text `json:"wallet"` - FromToken string `json:"from_token"` - ToToken string `json:"to_token"` + PoolID string `json:"pool_id"` Side OrderSide `json:"side"` Status OrderStatus `json:"status"` - Condition OrderCondition `json:"condition"` + Type OrderType `json:"type"` Price pgtype.Numeric `json:"price"` Amount pgtype.Numeric `json:"amount"` TwapTotalTime pgtype.Int4 `json:"twap_total_time"` @@ -118,11 +114,10 @@ func (q *Queries) InsertOrder(ctx context.Context, arg InsertOrderParams) (Order row := q.db.QueryRow(ctx, insertOrder, arg.ParentID, arg.Wallet, - arg.FromToken, - arg.ToToken, + arg.PoolID, arg.Side, arg.Status, - arg.Condition, + arg.Type, arg.Price, arg.Amount, arg.TwapTotalTime, @@ -133,13 +128,12 @@ func (q *Queries) InsertOrder(ctx context.Context, arg InsertOrderParams) (Order var i Order err := row.Scan( &i.ID, + &i.PoolID, &i.ParentID, &i.Wallet, - &i.FromToken, - &i.ToToken, &i.Status, &i.Side, - &i.Condition, + &i.Type, &i.Price, &i.Amount, &i.TwapTotalTime, diff --git a/pkg/db/pools.sql.go b/pkg/db/pools.sql.go index 8fc66c4..493df86 100644 --- a/pkg/db/pools.sql.go +++ b/pkg/db/pools.sql.go @@ -50,6 +50,28 @@ func (q *Queries) GetPool(ctx context.Context, id string) (Pool, error) { return i, err } +const getPoolByToken = `-- name: GetPoolByToken :one +SELECT id, token0_id, token1_id, created_at FROM pools +WHERE token0_id = $1 AND token1_id = $2 LIMIT 1 +` + +type GetPoolByTokenParams struct { + Token0ID string `json:"token0_id"` + Token1ID string `json:"token1_id"` +} + +func (q *Queries) GetPoolByToken(ctx context.Context, arg GetPoolByTokenParams) (Pool, error) { + row := q.db.QueryRow(ctx, getPoolByToken, arg.Token0ID, arg.Token1ID) + var i Pool + err := row.Scan( + &i.ID, + &i.Token0ID, + &i.Token1ID, + &i.CreatedAt, + ) + return i, err +} + const getPools = `-- name: GetPools :many SELECT id, token0_id, token1_id, created_at FROM pools ` diff --git a/pkg/db/querier.go b/pkg/db/querier.go index b9e0cea..5ef052c 100644 --- a/pkg/db/querier.go +++ b/pkg/db/querier.go @@ -15,6 +15,7 @@ type Querier interface { GetMarketData(ctx context.Context, arg GetMarketDataParams) ([]GetMarketDataRow, error) GetOrdersByWallet(ctx context.Context, arg GetOrdersByWalletParams) ([]GetOrdersByWalletRow, error) GetPool(ctx context.Context, id string) (Pool, error) + GetPoolByToken(ctx context.Context, arg GetPoolByTokenParams) (Pool, error) GetPools(ctx context.Context) ([]Pool, error) GetPriceByPoolID(ctx context.Context, poolID string) (Price, error) GetPrices(ctx context.Context, arg GetPricesParams) ([]Price, error) diff --git a/pkg/db/query/orders.sql b/pkg/db/query/orders.sql index b79c9ff..ae21484 100644 --- a/pkg/db/query/orders.sql +++ b/pkg/db/query/orders.sql @@ -1,6 +1,6 @@ -- name: InsertOrder :one -INSERT INTO orders (parent_id, wallet, from_token, to_token, side, status,condition, price, amount, twap_total_time, filled_at, cancelled_at, created_at) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) +INSERT INTO orders (parent_id, wallet, pool_id, side, status, type, price, amount, twap_total_time, filled_at, cancelled_at, created_at) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING *; -- name: GetOrdersByWallet :many diff --git a/pkg/db/query/pools.sql b/pkg/db/query/pools.sql index 6596de0..fa0bc6b 100644 --- a/pkg/db/query/pools.sql +++ b/pkg/db/query/pools.sql @@ -5,6 +5,10 @@ SELECT * FROM pools; SELECT * FROM pools WHERE id = $1 LIMIT 1; +-- name: GetPoolByToken :one +SELECT * FROM pools +WHERE token0_id = $1 AND token1_id = $2 LIMIT 1; + -- name: CreatePool :one INSERT INTO pools (id, token0_id, token1_id) VALUES ($1, $2, $3) From 791f23274b0bd61c0788f60cbdc203b536e82d2e Mon Sep 17 00:00:00 2001 From: TP-O Date: Tue, 25 Feb 2025 21:44:00 +0700 Subject: [PATCH 05/10] feat: fill market order --- internal/orders/services/order.go | 8 ++++++-- pkg/db/migration/000002_create_orders_table.up.sql | 2 +- pkg/db/models.go | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/internal/orders/services/order.go b/internal/orders/services/order.go index cfce8f6..3690f07 100644 --- a/internal/orders/services/order.go +++ b/internal/orders/services/order.go @@ -69,6 +69,10 @@ func CreateOrder(ctx context.Context, body CreateOrderBody) (*db.Order, error) { params db.InsertOrderParams ) + if err := copier.Copy(¶ms, &body); err != nil { + return nil, err + } + pool, err := db.DB.GetPoolByToken(ctx, db.GetPoolByTokenParams{ Token0ID: body.Token0, Token1ID: body.Token1, @@ -78,8 +82,8 @@ func CreateOrder(ctx context.Context, body CreateOrderBody) (*db.Order, error) { } params.PoolID = pool.ID - if err := copier.Copy(¶ms, &body); err != nil { - return nil, err + if params.Type == db.OrderTypeMARKET { + params.Status = db.OrderStatusFILLED } order, err := db.DB.InsertOrder(ctx, params) diff --git a/pkg/db/migration/000002_create_orders_table.up.sql b/pkg/db/migration/000002_create_orders_table.up.sql index 2cc1cc4..2a8035f 100644 --- a/pkg/db/migration/000002_create_orders_table.up.sql +++ b/pkg/db/migration/000002_create_orders_table.up.sql @@ -1,4 +1,4 @@ -CREATE TYPE ORDER_STATUS AS ENUM ('PENDING', 'PARTIAL_FILLED' ,'FILLED', 'REJECTED', 'CANCELED'); +CREATE TYPE ORDER_STATUS AS ENUM ('PENDING', 'PARTIAL_FILLED' ,'FILLED', 'REJECTED', 'CANCELLED'); CREATE TYPE ORDER_SIDE AS ENUM ('BUY', 'SELL'); CREATE TYPE ORDER_TYPE AS ENUM ('MARKET', 'LIMIT', 'STOP', 'TWAP'); diff --git a/pkg/db/models.go b/pkg/db/models.go index 6e7f417..19e1cf9 100644 --- a/pkg/db/models.go +++ b/pkg/db/models.go @@ -60,7 +60,7 @@ const ( OrderStatusPARTIALFILLED OrderStatus = "PARTIAL_FILLED" OrderStatusFILLED OrderStatus = "FILLED" OrderStatusREJECTED OrderStatus = "REJECTED" - OrderStatusCANCELED OrderStatus = "CANCELED" + OrderStatusCANCELLED OrderStatus = "CANCELLED" ) func (e *OrderStatus) Scan(src interface{}) error { From 797504f49970ee7f6131bca878a08af7077abac2 Mon Sep 17 00:00:00 2001 From: TP-O Date: Tue, 25 Feb 2025 21:45:06 +0700 Subject: [PATCH 06/10] fix: typo --- pkg/custom/order.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/custom/order.go b/pkg/custom/order.go index 4be7d51..4180458 100644 --- a/pkg/custom/order.go +++ b/pkg/custom/order.go @@ -109,22 +109,22 @@ func (o *OrderBook) Match(price pgtype.Numeric, priceTime pgtype.Timestamptz) [] } ) - orderBook.Type(db.OrderSideBUY, db.OrderTypeSTOP).AscendGreaterOrEqual(expected, func(order db.Order) bool { + orderBook.Sub(db.OrderSideBUY, db.OrderTypeSTOP).AscendGreaterOrEqual(expected, func(order db.Order) bool { stopBuyOrder = &order return false }) - orderBook.Type(db.OrderSideSELL, db.OrderTypeLIMIT).AscendGreaterOrEqual(expected, func(order db.Order) bool { + orderBook.Sub(db.OrderSideSELL, db.OrderTypeLIMIT).AscendGreaterOrEqual(expected, func(order db.Order) bool { limitSellOrder = &order return false }) - orderBook.Type(db.OrderSideBUY, db.OrderTypeLIMIT).DescendLessOrEqual(expected, func(order db.Order) bool { + orderBook.Sub(db.OrderSideBUY, db.OrderTypeLIMIT).DescendLessOrEqual(expected, func(order db.Order) bool { limitBuyOrder = &order return false }) - orderBook.Type(db.OrderSideSELL, db.OrderTypeSTOP).DescendLessOrEqual(expected, func(order db.Order) bool { + orderBook.Sub(db.OrderSideSELL, db.OrderTypeSTOP).DescendLessOrEqual(expected, func(order db.Order) bool { stopSellOrder = &order return false }) From 02963c59a0e0e8c0b915c0b3eaa8e53c149f0f7e Mon Sep 17 00:00:00 2001 From: TP-O Date: Tue, 25 Feb 2025 21:53:57 +0700 Subject: [PATCH 07/10] feat: load orders to orderBook when app started --- pkg/custom/order.go | 22 ++++++++++++++++++---- pkg/db/orders.sql.go | 39 +++++++++++++++++++++++++++++++++++++++ pkg/db/querier.go | 1 + pkg/db/query/orders.sql | 4 ++++ 4 files changed, 62 insertions(+), 4 deletions(-) diff --git a/pkg/custom/order.go b/pkg/custom/order.go index 4180458..06a258d 100644 --- a/pkg/custom/order.go +++ b/pkg/custom/order.go @@ -1,6 +1,7 @@ package custom import ( + "context" "github.com/google/btree" "github.com/jackc/pgx/v5/pgtype" "github.com/zuni-lab/dexon-service/pkg/db" @@ -35,6 +36,19 @@ func GetOrderBook() *OrderBook { return orderBook } +func init() { + ctx := context.Background() + orders, err := db.DB.GetOrdersByStatus(ctx, []string{string(db.OrderStatusPENDING), string(db.OrderStatusPARTIALFILLED)}) + if err != nil { + panic(err) + } + + orderBook := GetOrderBook() + for _, order := range orders { + orderBook.Sub(order.Side, order.Type).ReplaceOrInsert(order) + } +} + func compareOrder(a, b db.Order) bool { sub := new(big.Int).Sub(a.Price.Int, b.Price.Int).Sign() if sub < 0 { @@ -69,19 +83,19 @@ func (o *OrderBook) Sub(side db.OrderSide, oType db.OrderType) *OrderTree { func (o *OrderTree) ReplaceOrInsert(order db.Order) { o.mux.Lock() defer o.mux.Unlock() - o.ReplaceOrInsert(order) + o.BTreeG.ReplaceOrInsert(order) } -func (o *OrderTree) Get(order db.Order) db.Order { +func (o *OrderTree) Get(order db.Order) (db.Order, bool) { o.mux.RLock() defer o.mux.RUnlock() - return o.Get(order) + return o.BTreeG.Get(order) } func (o *OrderTree) Delete(order db.Order) { o.mux.Lock() defer o.mux.Unlock() - o.Delete(order) + o.BTreeG.Delete(order) } func (o *OrderBook) Match(price pgtype.Numeric, priceTime pgtype.Timestamptz) []*db.Order { diff --git a/pkg/db/orders.sql.go b/pkg/db/orders.sql.go index 474f342..efc3502 100644 --- a/pkg/db/orders.sql.go +++ b/pkg/db/orders.sql.go @@ -11,6 +11,45 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) +const getOrdersByStatus = `-- name: GetOrdersByStatus :many +SELECT id, pool_id, parent_id, wallet, status, side, type, price, amount, twap_total_time, filled_at, cancelled_at, created_at FROM orders +WHERE status = ANY($1::varchar[]) +` + +func (q *Queries) GetOrdersByStatus(ctx context.Context, status []string) ([]Order, error) { + rows, err := q.db.Query(ctx, getOrdersByStatus, status) + if err != nil { + return nil, err + } + defer rows.Close() + items := []Order{} + for rows.Next() { + var i Order + if err := rows.Scan( + &i.ID, + &i.PoolID, + &i.ParentID, + &i.Wallet, + &i.Status, + &i.Side, + &i.Type, + &i.Price, + &i.Amount, + &i.TwapTotalTime, + &i.FilledAt, + &i.CancelledAt, + &i.CreatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getOrdersByWallet = `-- name: GetOrdersByWallet :many SELECT o1.id, o1.pool_id, o1.parent_id, o1.wallet, o1.status, o1.side, o1.type, o1.price, o1.amount, o1.twap_total_time, o1.filled_at, o1.cancelled_at, o1.created_at, o2.id, o2.pool_id, o2.parent_id, o2.wallet, o2.status, o2.side, o2.type, o2.price, o2.amount, o2.twap_total_time, o2.filled_at, o2.cancelled_at, o2.created_at FROM orders AS o1 LEFT JOIN orders AS o2 ON o1.id = o2.parent_id AND o2.parent_id IS NOT NULL diff --git a/pkg/db/querier.go b/pkg/db/querier.go index 5ef052c..364cfee 100644 --- a/pkg/db/querier.go +++ b/pkg/db/querier.go @@ -13,6 +13,7 @@ type Querier interface { CreatePrice(ctx context.Context, arg CreatePriceParams) (Price, error) CreateToken(ctx context.Context, arg CreateTokenParams) (Token, error) GetMarketData(ctx context.Context, arg GetMarketDataParams) ([]GetMarketDataRow, error) + GetOrdersByStatus(ctx context.Context, status []string) ([]Order, error) GetOrdersByWallet(ctx context.Context, arg GetOrdersByWalletParams) ([]GetOrdersByWalletRow, error) GetPool(ctx context.Context, id string) (Pool, error) GetPoolByToken(ctx context.Context, arg GetPoolByTokenParams) (Pool, error) diff --git a/pkg/db/query/orders.sql b/pkg/db/query/orders.sql index ae21484..9c8cadd 100644 --- a/pkg/db/query/orders.sql +++ b/pkg/db/query/orders.sql @@ -9,3 +9,7 @@ LEFT JOIN orders AS o2 ON o1.id = o2.parent_id AND o2.parent_id IS NOT NULL WHERE o1.wallet = $1 ORDER BY o1.created_at DESC LIMIT $2 OFFSET $3; + +-- name: GetOrdersByStatus :many +SELECT * FROM orders +WHERE status = ANY(@status::varchar[]); From d867da62cfd41ec6612fe91f48001fb34c8e410a Mon Sep 17 00:00:00 2001 From: TP-O Date: Wed, 26 Feb 2025 18:01:24 +0700 Subject: [PATCH 08/10] fix: rm btree --- cmd/worker/job/price_tracker.go | 27 +-- internal/orders/services/order.go | 46 ++++- pkg/custom/order.go | 158 ---------------- .../000002_create_orders_table.up.sql | 4 +- pkg/db/models.go | 28 +-- pkg/db/orders.sql.go | 179 ++++++++++++++---- pkg/db/querier.go | 4 + pkg/db/query/orders.sql | 31 ++- 8 files changed, 237 insertions(+), 240 deletions(-) delete mode 100644 pkg/custom/order.go diff --git a/cmd/worker/job/price_tracker.go b/cmd/worker/job/price_tracker.go index 0d7bcdf..402f1d7 100644 --- a/cmd/worker/job/price_tracker.go +++ b/cmd/worker/job/price_tracker.go @@ -3,11 +3,8 @@ package job import ( "context" "fmt" - "github.com/jackc/pgx/v5/pgtype" - "github.com/zuni-lab/dexon-service/pkg/custom" + "github.com/zuni-lab/dexon-service/internal/orders/services" "github.com/zuni-lab/dexon-service/pkg/utils" - "math" - "math/big" "strings" "sync" "time" @@ -203,25 +200,9 @@ RETRY: func (p *PriceTracker) handleEvent(event *evm.UniswapV3Swap) error { price := utils.CalculatePrice(event.SqrtPriceX96) - priceInt := new(big.Int) - price.Int(priceInt) - priceExp2 := new(big.Float).MantExp(price) - priceExp10 := int32(math.Floor(float64(priceExp2) * math.Log10(2))) - - orderBook := custom.GetOrderBook() - matchedOrders := orderBook.Match( - pgtype.Numeric{ - Int: priceInt, - Exp: priceExp10, - Valid: true, - }, - pgtype.Timestamptz{ - Time: time.Now(), - }, - ) - - for _, order := range matchedOrders { - orderBook.Sub(order.Side, order.Type).Delete(*order) + _, err := services.MatchOrder(context.Background(), price.String()) + if err != nil { + log.Info().Any("event", event).Err(err).Msgf("[PriceTracker] [HandleEvent] failed to match order") } log.Info().Any("event", event).Msgf("[PriceTracker] [HandleEvent] handled %s event", event.Raw.Address.Hex()) diff --git a/internal/orders/services/order.go b/internal/orders/services/order.go index 3690f07..77292f3 100644 --- a/internal/orders/services/order.go +++ b/internal/orders/services/order.go @@ -2,8 +2,9 @@ package services import ( "context" + "errors" + "github.com/jackc/pgx/v5/pgtype" "github.com/jinzhu/copier" - "github.com/zuni-lab/dexon-service/pkg/custom" "github.com/zuni-lab/dexon-service/pkg/db" "slices" "time" @@ -84,6 +85,7 @@ func CreateOrder(ctx context.Context, body CreateOrderBody) (*db.Order, error) { params.PoolID = pool.ID if params.Type == db.OrderTypeMARKET { params.Status = db.OrderStatusFILLED + _ = params.FilledAt.Scan(time.Now()) } order, err := db.DB.InsertOrder(ctx, params) @@ -91,8 +93,6 @@ func CreateOrder(ctx context.Context, body CreateOrderBody) (*db.Order, error) { return nil, err } - orderBook := custom.GetOrderBook() - orderBook.Sub(order.Side, order.Type).ReplaceOrInsert(order) return &order, nil } @@ -103,12 +103,10 @@ func FillPartialOrder(ctx context.Context, parent db.Order, price, amount string } _ = params.ParentID.Scan(parent.ID) - _ = params.CreatedAt.Scan(time.Now()) + _ = params.FilledAt.Scan(time.Now()) _ = params.Price.Scan(price) _ = params.Amount.Scan(amount) - params.TwapTotalTime.Valid = false params.Status = db.OrderStatusFILLED - params.FilledAt = params.CreatedAt order, err := db.DB.InsertOrder(ctx, params) if err != nil { @@ -117,3 +115,39 @@ func FillPartialOrder(ctx context.Context, parent db.Order, price, amount string return &order, nil } + +func MatchOrder(ctx context.Context, price string) (*db.Order, error) { + var numericPrice pgtype.Numeric + err := numericPrice.Scan(price) + if err != nil { + return nil, err + } + + order, err := db.DB.GetMatchedOrder(ctx, numericPrice) + if err != nil { + return nil, err + } + + // TODO: Call to contract + + params := db.UpdateOrderParams{ + ID: order.ID, + } + if order.Type == db.OrderTypeLIMIT || + order.Type == db.OrderTypeSTOP || + order.Type == db.OrderTypeTWAP && order.TwapAmount.Int.Cmp(order.Amount.Int) == 0 { + _ = params.FilledAt.Scan(time.Now()) + params.Status = db.OrderStatusFILLED + } else if order.Type == db.OrderTypeTWAP { + params.Status = db.OrderStatusPARTIALFILLED + } else { + return nil, errors.New("invalid order") + } + + order, err = db.DB.UpdateOrder(ctx, params) + if err != nil { + return nil, err + } + + return &order, nil +} diff --git a/pkg/custom/order.go b/pkg/custom/order.go deleted file mode 100644 index 06a258d..0000000 --- a/pkg/custom/order.go +++ /dev/null @@ -1,158 +0,0 @@ -package custom - -import ( - "context" - "github.com/google/btree" - "github.com/jackc/pgx/v5/pgtype" - "github.com/zuni-lab/dexon-service/pkg/db" - "math/big" - "sync" -) - -var orderBook *OrderBook - -type OrderBook struct { - limitBuy *OrderTree - limitSell *OrderTree - stopBuy *OrderTree - stopSell *OrderTree -} - -type OrderTree struct { - btree.BTreeG[db.Order] - mux sync.RWMutex -} - -func GetOrderBook() *OrderBook { - sync.OnceFunc(func() { - orderBook = &OrderBook{ - limitBuy: newOrderTree(), - limitSell: newOrderTree(), - stopBuy: newOrderTree(), - stopSell: newOrderTree(), - } - })() - - return orderBook -} - -func init() { - ctx := context.Background() - orders, err := db.DB.GetOrdersByStatus(ctx, []string{string(db.OrderStatusPENDING), string(db.OrderStatusPARTIALFILLED)}) - if err != nil { - panic(err) - } - - orderBook := GetOrderBook() - for _, order := range orders { - orderBook.Sub(order.Side, order.Type).ReplaceOrInsert(order) - } -} - -func compareOrder(a, b db.Order) bool { - sub := new(big.Int).Sub(a.Price.Int, b.Price.Int).Sign() - if sub < 0 { - return true - } else if sub > 0 { - return false - } else { - return a.CreatedAt.Time.After(b.CreatedAt.Time) - } -} - -func newOrderTree() *OrderTree { - return &OrderTree{ - BTreeG: *btree.NewG[db.Order](32, compareOrder), - } -} - -func (o *OrderBook) Sub(side db.OrderSide, oType db.OrderType) *OrderTree { - if side == db.OrderSideBUY && oType == db.OrderTypeLIMIT { - return o.limitBuy - } else if side == db.OrderSideBUY && oType == db.OrderTypeSTOP { - return o.stopBuy - } else if side == db.OrderSideSELL && oType == db.OrderTypeLIMIT { - return o.limitSell - } else if side == db.OrderSideSELL && oType == db.OrderTypeSTOP { - return o.stopSell - } else { - return nil - } -} - -func (o *OrderTree) ReplaceOrInsert(order db.Order) { - o.mux.Lock() - defer o.mux.Unlock() - o.BTreeG.ReplaceOrInsert(order) -} - -func (o *OrderTree) Get(order db.Order) (db.Order, bool) { - o.mux.RLock() - defer o.mux.RUnlock() - return o.BTreeG.Get(order) -} - -func (o *OrderTree) Delete(order db.Order) { - o.mux.Lock() - defer o.mux.Unlock() - o.BTreeG.Delete(order) -} - -func (o *OrderBook) Match(price pgtype.Numeric, priceTime pgtype.Timestamptz) []*db.Order { - o.stopBuy.mux.RLock() - o.limitBuy.mux.RLock() - o.stopSell.mux.RLock() - o.limitSell.mux.RLock() - defer func() { - o.stopBuy.mux.RUnlock() - o.limitBuy.mux.RUnlock() - o.stopSell.mux.RUnlock() - o.limitSell.mux.RUnlock() - }() - - var ( - stopBuyOrder *db.Order - limitSellOrder *db.Order - limitBuyOrder *db.Order - stopSellOrder *db.Order - matchedOrder *db.Order - - expected = db.Order{ - Price: price, - CreatedAt: priceTime, - } - ) - - orderBook.Sub(db.OrderSideBUY, db.OrderTypeSTOP).AscendGreaterOrEqual(expected, func(order db.Order) bool { - stopBuyOrder = &order - return false - }) - - orderBook.Sub(db.OrderSideSELL, db.OrderTypeLIMIT).AscendGreaterOrEqual(expected, func(order db.Order) bool { - limitSellOrder = &order - return false - }) - - orderBook.Sub(db.OrderSideBUY, db.OrderTypeLIMIT).DescendLessOrEqual(expected, func(order db.Order) bool { - limitBuyOrder = &order - return false - }) - - orderBook.Sub(db.OrderSideSELL, db.OrderTypeSTOP).DescendLessOrEqual(expected, func(order db.Order) bool { - stopSellOrder = &order - return false - }) - - matchedOrder = stopBuyOrder - if limitSellOrder != nil && limitSellOrder.CreatedAt.Time.Before(matchedOrder.CreatedAt.Time) { - matchedOrder = limitSellOrder - } - if limitBuyOrder != nil && limitBuyOrder.CreatedAt.Time.Before(matchedOrder.CreatedAt.Time) { - matchedOrder = limitBuyOrder - } - if stopSellOrder != nil && stopSellOrder.CreatedAt.Time.Before(matchedOrder.CreatedAt.Time) { - matchedOrder = stopSellOrder - } - - return []*db.Order{matchedOrder} -} diff --git a/pkg/db/migration/000002_create_orders_table.up.sql b/pkg/db/migration/000002_create_orders_table.up.sql index 2a8035f..320e0be 100644 --- a/pkg/db/migration/000002_create_orders_table.up.sql +++ b/pkg/db/migration/000002_create_orders_table.up.sql @@ -12,7 +12,9 @@ CREATE TABLE IF NOT EXISTS orders ( type ORDER_TYPE NOT NULL, price NUMERIC(78,18) NOT NULL, amount NUMERIC(78,18) NOT NULL, - twap_total_time INT, + twap_amount NUMERIC(78,18), + twap_parts INT, + partial_filled_at TIMESTAMP WITH TIME ZONE, filled_at TIMESTAMP WITH TIME ZONE, cancelled_at TIMESTAMP WITH TIME ZONE, created_at TIMESTAMP WITH TIME ZONE, diff --git a/pkg/db/models.go b/pkg/db/models.go index 19e1cf9..97c2416 100644 --- a/pkg/db/models.go +++ b/pkg/db/models.go @@ -143,19 +143,21 @@ func (ns NullOrderType) Value() (driver.Value, error) { } type Order struct { - ID int64 `json:"id"` - PoolID string `json:"pool_id"` - ParentID pgtype.Int8 `json:"parent_id"` - Wallet pgtype.Text `json:"wallet"` - Status OrderStatus `json:"status"` - Side OrderSide `json:"side"` - Type OrderType `json:"type"` - Price pgtype.Numeric `json:"price"` - Amount pgtype.Numeric `json:"amount"` - TwapTotalTime pgtype.Int4 `json:"twap_total_time"` - FilledAt pgtype.Timestamptz `json:"filled_at"` - CancelledAt pgtype.Timestamptz `json:"cancelled_at"` - CreatedAt pgtype.Timestamptz `json:"created_at"` + ID int64 `json:"id"` + PoolID string `json:"pool_id"` + ParentID pgtype.Int8 `json:"parent_id"` + Wallet pgtype.Text `json:"wallet"` + Status OrderStatus `json:"status"` + Side OrderSide `json:"side"` + Type OrderType `json:"type"` + Price pgtype.Numeric `json:"price"` + Amount pgtype.Numeric `json:"amount"` + TwapAmount pgtype.Numeric `json:"twap_amount"` + TwapParts pgtype.Int4 `json:"twap_parts"` + PartialFilledAt pgtype.Timestamptz `json:"partial_filled_at"` + FilledAt pgtype.Timestamptz `json:"filled_at"` + CancelledAt pgtype.Timestamptz `json:"cancelled_at"` + CreatedAt pgtype.Timestamptz `json:"created_at"` } type Pool struct { diff --git a/pkg/db/orders.sql.go b/pkg/db/orders.sql.go index efc3502..c14e298 100644 --- a/pkg/db/orders.sql.go +++ b/pkg/db/orders.sql.go @@ -11,8 +11,47 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) +const getMatchedOrder = `-- name: GetMatchedOrder :one +SELECT id, pool_id, parent_id, wallet, status, side, type, price, amount, twap_amount, twap_parts, partial_filled_at, filled_at, cancelled_at, created_at FROM orders +WHERE ( + (side = 'BUY' AND type = 'LIMIT' AND price <= $1) + OR (side = 'SELL' AND type = 'LIMIT' AND price >= $1) + OR (side = 'BUY' AND type = 'STOP' AND price >= $1) + OR (side = 'SELL' AND type 'STOP' AND price <= $1) + OR (side = 'BUY' AND type = 'TWAP' AND price <= $1) + OR (side = 'SELL' AND type = 'TWAP' AND price >= $1) + ) + AND status IN ('PENDING', 'PARTIAL_FILLED') + AND type <> 'TWAP' +ORDER BY created_at ASC +LIMIT 1 +` + +func (q *Queries) GetMatchedOrder(ctx context.Context, price pgtype.Numeric) (Order, error) { + row := q.db.QueryRow(ctx, getMatchedOrder, price) + var i Order + err := row.Scan( + &i.ID, + &i.PoolID, + &i.ParentID, + &i.Wallet, + &i.Status, + &i.Side, + &i.Type, + &i.Price, + &i.Amount, + &i.TwapAmount, + &i.TwapParts, + &i.PartialFilledAt, + &i.FilledAt, + &i.CancelledAt, + &i.CreatedAt, + ) + return i, err +} + const getOrdersByStatus = `-- name: GetOrdersByStatus :many -SELECT id, pool_id, parent_id, wallet, status, side, type, price, amount, twap_total_time, filled_at, cancelled_at, created_at FROM orders +SELECT id, pool_id, parent_id, wallet, status, side, type, price, amount, twap_amount, twap_parts, partial_filled_at, filled_at, cancelled_at, created_at FROM orders WHERE status = ANY($1::varchar[]) ` @@ -35,7 +74,9 @@ func (q *Queries) GetOrdersByStatus(ctx context.Context, status []string) ([]Ord &i.Type, &i.Price, &i.Amount, - &i.TwapTotalTime, + &i.TwapAmount, + &i.TwapParts, + &i.PartialFilledAt, &i.FilledAt, &i.CancelledAt, &i.CreatedAt, @@ -51,7 +92,7 @@ func (q *Queries) GetOrdersByStatus(ctx context.Context, status []string) ([]Ord } const getOrdersByWallet = `-- name: GetOrdersByWallet :many -SELECT o1.id, o1.pool_id, o1.parent_id, o1.wallet, o1.status, o1.side, o1.type, o1.price, o1.amount, o1.twap_total_time, o1.filled_at, o1.cancelled_at, o1.created_at, o2.id, o2.pool_id, o2.parent_id, o2.wallet, o2.status, o2.side, o2.type, o2.price, o2.amount, o2.twap_total_time, o2.filled_at, o2.cancelled_at, o2.created_at FROM orders AS o1 +SELECT o1.id, o1.pool_id, o1.parent_id, o1.wallet, o1.status, o1.side, o1.type, o1.price, o1.amount, o1.twap_amount, o1.twap_parts, o1.partial_filled_at, o1.filled_at, o1.cancelled_at, o1.created_at, o2.id, o2.pool_id, o2.parent_id, o2.wallet, o2.status, o2.side, o2.type, o2.price, o2.amount, o2.twap_amount, o2.twap_parts, o2.partial_filled_at, o2.filled_at, o2.cancelled_at, o2.created_at FROM orders AS o1 LEFT JOIN orders AS o2 ON o1.id = o2.parent_id AND o2.parent_id IS NOT NULL WHERE o1.wallet = $1 ORDER BY o1.created_at DESC @@ -65,20 +106,22 @@ type GetOrdersByWalletParams struct { } type GetOrdersByWalletRow struct { - ID int64 `json:"id"` - PoolID string `json:"pool_id"` - ParentID pgtype.Int8 `json:"parent_id"` - Wallet pgtype.Text `json:"wallet"` - Status OrderStatus `json:"status"` - Side OrderSide `json:"side"` - Type OrderType `json:"type"` - Price pgtype.Numeric `json:"price"` - Amount pgtype.Numeric `json:"amount"` - TwapTotalTime pgtype.Int4 `json:"twap_total_time"` - FilledAt pgtype.Timestamptz `json:"filled_at"` - CancelledAt pgtype.Timestamptz `json:"cancelled_at"` - CreatedAt pgtype.Timestamptz `json:"created_at"` - Order Order `json:"order"` + ID int64 `json:"id"` + PoolID string `json:"pool_id"` + ParentID pgtype.Int8 `json:"parent_id"` + Wallet pgtype.Text `json:"wallet"` + Status OrderStatus `json:"status"` + Side OrderSide `json:"side"` + Type OrderType `json:"type"` + Price pgtype.Numeric `json:"price"` + Amount pgtype.Numeric `json:"amount"` + TwapAmount pgtype.Numeric `json:"twap_amount"` + TwapParts pgtype.Int4 `json:"twap_parts"` + PartialFilledAt pgtype.Timestamptz `json:"partial_filled_at"` + FilledAt pgtype.Timestamptz `json:"filled_at"` + CancelledAt pgtype.Timestamptz `json:"cancelled_at"` + CreatedAt pgtype.Timestamptz `json:"created_at"` + Order Order `json:"order"` } func (q *Queries) GetOrdersByWallet(ctx context.Context, arg GetOrdersByWalletParams) ([]GetOrdersByWalletRow, error) { @@ -100,7 +143,9 @@ func (q *Queries) GetOrdersByWallet(ctx context.Context, arg GetOrdersByWalletPa &i.Type, &i.Price, &i.Amount, - &i.TwapTotalTime, + &i.TwapAmount, + &i.TwapParts, + &i.PartialFilledAt, &i.FilledAt, &i.CancelledAt, &i.CreatedAt, @@ -113,7 +158,9 @@ func (q *Queries) GetOrdersByWallet(ctx context.Context, arg GetOrdersByWalletPa &i.Order.Type, &i.Order.Price, &i.Order.Amount, - &i.Order.TwapTotalTime, + &i.Order.TwapAmount, + &i.Order.TwapParts, + &i.Order.PartialFilledAt, &i.Order.FilledAt, &i.Order.CancelledAt, &i.Order.CreatedAt, @@ -129,24 +176,25 @@ func (q *Queries) GetOrdersByWallet(ctx context.Context, arg GetOrdersByWalletPa } const insertOrder = `-- name: InsertOrder :one -INSERT INTO orders (parent_id, wallet, pool_id, side, status, type, price, amount, twap_total_time, filled_at, cancelled_at, created_at) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) -RETURNING id, pool_id, parent_id, wallet, status, side, type, price, amount, twap_total_time, filled_at, cancelled_at, created_at +INSERT INTO orders (parent_id, wallet, pool_id, side, status, type, price, amount, twap_amount, twap_parts, filled_at, partial_filled_at, cancelled_at) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) +RETURNING id, pool_id, parent_id, wallet, status, side, type, price, amount, twap_amount, twap_parts, partial_filled_at, filled_at, cancelled_at, created_at ` type InsertOrderParams struct { - ParentID pgtype.Int8 `json:"parent_id"` - Wallet pgtype.Text `json:"wallet"` - PoolID string `json:"pool_id"` - Side OrderSide `json:"side"` - Status OrderStatus `json:"status"` - Type OrderType `json:"type"` - Price pgtype.Numeric `json:"price"` - Amount pgtype.Numeric `json:"amount"` - TwapTotalTime pgtype.Int4 `json:"twap_total_time"` - FilledAt pgtype.Timestamptz `json:"filled_at"` - CancelledAt pgtype.Timestamptz `json:"cancelled_at"` - CreatedAt pgtype.Timestamptz `json:"created_at"` + ParentID pgtype.Int8 `json:"parent_id"` + Wallet pgtype.Text `json:"wallet"` + PoolID string `json:"pool_id"` + Side OrderSide `json:"side"` + Status OrderStatus `json:"status"` + Type OrderType `json:"type"` + Price pgtype.Numeric `json:"price"` + Amount pgtype.Numeric `json:"amount"` + TwapAmount pgtype.Numeric `json:"twap_amount"` + TwapParts pgtype.Int4 `json:"twap_parts"` + FilledAt pgtype.Timestamptz `json:"filled_at"` + PartialFilledAt pgtype.Timestamptz `json:"partial_filled_at"` + CancelledAt pgtype.Timestamptz `json:"cancelled_at"` } func (q *Queries) InsertOrder(ctx context.Context, arg InsertOrderParams) (Order, error) { @@ -159,10 +207,65 @@ func (q *Queries) InsertOrder(ctx context.Context, arg InsertOrderParams) (Order arg.Type, arg.Price, arg.Amount, - arg.TwapTotalTime, + arg.TwapAmount, + arg.TwapParts, + arg.FilledAt, + arg.PartialFilledAt, + arg.CancelledAt, + ) + var i Order + err := row.Scan( + &i.ID, + &i.PoolID, + &i.ParentID, + &i.Wallet, + &i.Status, + &i.Side, + &i.Type, + &i.Price, + &i.Amount, + &i.TwapAmount, + &i.TwapParts, + &i.PartialFilledAt, + &i.FilledAt, + &i.CancelledAt, + &i.CreatedAt, + ) + return i, err +} + +const updateOrder = `-- name: UpdateOrder :one +UPDATE orders +SET + status = COALESCE($2, status), + twap_amount = COALESCE($3, twap_amount), + filled_at = COALESCE($4, filled_at), + cancelled_at = COALESCE($5, cancelled_at), + twap_amount = COALESCE($6, twap_amount), + partial_filled_at = COALESCE($7, partial_filled_at) +WHERE id = $1 +RETURNING id, pool_id, parent_id, wallet, status, side, type, price, amount, twap_amount, twap_parts, partial_filled_at, filled_at, cancelled_at, created_at +` + +type UpdateOrderParams struct { + ID int64 `json:"id"` + Status OrderStatus `json:"status"` + TwapAmount pgtype.Numeric `json:"twap_amount"` + FilledAt pgtype.Timestamptz `json:"filled_at"` + CancelledAt pgtype.Timestamptz `json:"cancelled_at"` + TwapAmount_2 pgtype.Numeric `json:"twap_amount_2"` + PartialFilledAt pgtype.Timestamptz `json:"partial_filled_at"` +} + +func (q *Queries) UpdateOrder(ctx context.Context, arg UpdateOrderParams) (Order, error) { + row := q.db.QueryRow(ctx, updateOrder, + arg.ID, + arg.Status, + arg.TwapAmount, arg.FilledAt, arg.CancelledAt, - arg.CreatedAt, + arg.TwapAmount_2, + arg.PartialFilledAt, ) var i Order err := row.Scan( @@ -175,7 +278,9 @@ func (q *Queries) InsertOrder(ctx context.Context, arg InsertOrderParams) (Order &i.Type, &i.Price, &i.Amount, - &i.TwapTotalTime, + &i.TwapAmount, + &i.TwapParts, + &i.PartialFilledAt, &i.FilledAt, &i.CancelledAt, &i.CreatedAt, diff --git a/pkg/db/querier.go b/pkg/db/querier.go index 364cfee..d680cf0 100644 --- a/pkg/db/querier.go +++ b/pkg/db/querier.go @@ -6,6 +6,8 @@ package db import ( "context" + + "github.com/jackc/pgx/v5/pgtype" ) type Querier interface { @@ -13,6 +15,7 @@ type Querier interface { CreatePrice(ctx context.Context, arg CreatePriceParams) (Price, error) CreateToken(ctx context.Context, arg CreateTokenParams) (Token, error) GetMarketData(ctx context.Context, arg GetMarketDataParams) ([]GetMarketDataRow, error) + GetMatchedOrder(ctx context.Context, price pgtype.Numeric) (Order, error) GetOrdersByStatus(ctx context.Context, status []string) ([]Order, error) GetOrdersByWallet(ctx context.Context, arg GetOrdersByWalletParams) ([]GetOrdersByWalletRow, error) GetPool(ctx context.Context, id string) (Pool, error) @@ -22,6 +25,7 @@ type Querier interface { GetPrices(ctx context.Context, arg GetPricesParams) ([]Price, error) InsertOrder(ctx context.Context, arg InsertOrderParams) (Order, error) PoolDetails(ctx context.Context, id string) (PoolDetailsRow, error) + UpdateOrder(ctx context.Context, arg UpdateOrderParams) (Order, error) } var _ Querier = (*Queries)(nil) diff --git a/pkg/db/query/orders.sql b/pkg/db/query/orders.sql index 9c8cadd..b010a74 100644 --- a/pkg/db/query/orders.sql +++ b/pkg/db/query/orders.sql @@ -1,6 +1,6 @@ -- name: InsertOrder :one -INSERT INTO orders (parent_id, wallet, pool_id, side, status, type, price, amount, twap_total_time, filled_at, cancelled_at, created_at) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) +INSERT INTO orders (parent_id, wallet, pool_id, side, status, type, price, amount, twap_amount, twap_parts, filled_at, partial_filled_at, cancelled_at) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING *; -- name: GetOrdersByWallet :many @@ -13,3 +13,30 @@ LIMIT $2 OFFSET $3; -- name: GetOrdersByStatus :many SELECT * FROM orders WHERE status = ANY(@status::varchar[]); + +-- name: GetMatchedOrder :one +SELECT * FROM orders +WHERE ( + (side = 'BUY' AND type = 'LIMIT' AND price <= $1) + OR (side = 'SELL' AND type = 'LIMIT' AND price >= $1) + OR (side = 'BUY' AND type = 'STOP' AND price >= $1) + OR (side = 'SELL' AND type 'STOP' AND price <= $1) + OR (side = 'BUY' AND type = 'TWAP' AND price <= $1) + OR (side = 'SELL' AND type = 'TWAP' AND price >= $1) + ) + AND status IN ('PENDING', 'PARTIAL_FILLED') + AND type <> 'TWAP' +ORDER BY created_at ASC +LIMIT 1; + +-- name: UpdateOrder :one +UPDATE orders +SET + status = COALESCE($2, status), + twap_amount = COALESCE($3, twap_amount), + filled_at = COALESCE($4, filled_at), + cancelled_at = COALESCE($5, cancelled_at), + twap_amount = COALESCE($6, twap_amount), + partial_filled_at = COALESCE($7, partial_filled_at) +WHERE id = $1 +RETURNING *; From b59734dfc2781b209a0f1803b16eb9be1067fa0c Mon Sep 17 00:00:00 2001 From: TP-O Date: Thu, 27 Feb 2025 14:19:22 +0700 Subject: [PATCH 09/10] fix: fill empty val --- cmd/worker/job/price_tracker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/worker/job/price_tracker.go b/cmd/worker/job/price_tracker.go index 402f1d7..2443b10 100644 --- a/cmd/worker/job/price_tracker.go +++ b/cmd/worker/job/price_tracker.go @@ -199,7 +199,7 @@ RETRY: } func (p *PriceTracker) handleEvent(event *evm.UniswapV3Swap) error { - price := utils.CalculatePrice(event.SqrtPriceX96) + price := utils.CalculatePrice(nil, 0, 0, false) _, err := services.MatchOrder(context.Background(), price.String()) if err != nil { log.Info().Any("event", event).Err(err).Msgf("[PriceTracker] [HandleEvent] failed to match order") From 1c44626282676947ee8c90ffd846e122966a2da7 Mon Sep 17 00:00:00 2001 From: TP-O Date: Thu, 27 Feb 2025 19:05:24 +0700 Subject: [PATCH 10/10] fix: conflict --- app.compose.yml | 2 +- cmd/api/server/routes.go | 2 ++ internal/orders/handlers/list.go.go | 4 ++-- internal/orders/route.go | 6 +++--- internal/orders/services/order.go | 10 ++++++---- 5 files changed, 14 insertions(+), 10 deletions(-) diff --git a/app.compose.yml b/app.compose.yml index 3e3360f..2e1f913 100644 --- a/app.compose.yml +++ b/app.compose.yml @@ -24,7 +24,7 @@ services: volumes: - dexon_openobserve_data:/data api: - image: dexon-api + image: dexon-api container_name: dexon-api build: context: . diff --git a/cmd/api/server/routes.go b/cmd/api/server/routes.go index ce9c643..9b5b0f4 100644 --- a/cmd/api/server/routes.go +++ b/cmd/api/server/routes.go @@ -4,6 +4,7 @@ import ( "github.com/labstack/echo/v4" "github.com/zuni-lab/dexon-service/internal/chat" "github.com/zuni-lab/dexon-service/internal/health" + "github.com/zuni-lab/dexon-service/internal/orders" "github.com/zuni-lab/dexon-service/internal/pools" ) @@ -12,4 +13,5 @@ func setupRoute(e *echo.Echo) { health.Route(e, "/health") pools.Route(api, "/pools") chat.Route(api, "/chat") + orders.Route(api, "/orders") } diff --git a/internal/orders/handlers/list.go.go b/internal/orders/handlers/list.go.go index cbaf160..1627451 100644 --- a/internal/orders/handlers/list.go.go +++ b/internal/orders/handlers/list.go.go @@ -16,10 +16,10 @@ func List(c echo.Context) error { return err } - candlestick, err := services.ListOrderByWallet(ctx, query) + orders, err := services.ListOrderByWallet(ctx, query) if err != nil { return echo.NewHTTPError(http.StatusInternalServerError, err) } - return c.JSON(http.StatusOK, candlestick) + return c.JSON(http.StatusOK, orders) } diff --git a/internal/orders/route.go b/internal/orders/route.go index 5b7ca3e..4738ddb 100644 --- a/internal/orders/route.go +++ b/internal/orders/route.go @@ -1,4 +1,4 @@ -package prices +package orders import ( "github.com/labstack/echo/v4" @@ -17,6 +17,6 @@ func Route(g *echo.Group, path string) { ordersGroup := g.Group(path, middleware) - ordersGroup.GET("/", handlers.List) - ordersGroup.POST("/", handlers.Create) + ordersGroup.GET("", handlers.List) + ordersGroup.POST("", handlers.Create) } diff --git a/internal/orders/services/order.go b/internal/orders/services/order.go index 77292f3..3f45028 100644 --- a/internal/orders/services/order.go +++ b/internal/orders/services/order.go @@ -11,9 +11,9 @@ import ( ) type ListOrdersByWalletQuery struct { - Wallet string `json:"wallet" validate:"eth_addr"` - Limit int32 `json:"limit"` - Offset int32 `json:"offset"` + Wallet string `query:"wallet" validate:"eth_addr"` + Limit int32 `query:"limit" validate:"gt=0"` + Offset int32 `query:"offset" validate:"gte=0"` } type ListOrdersByWalletResponseItem struct { @@ -34,7 +34,7 @@ func ListOrderByWallet(ctx context.Context, query ListOrdersByWalletQuery) ([]Li var ( item ListOrdersByWalletResponseItem - res []ListOrdersByWalletResponseItem + res = []ListOrdersByWalletResponseItem{} ) for _, order := range orders { if idx := slices.IndexFunc(res, func(item ListOrdersByWalletResponseItem) bool { @@ -86,6 +86,8 @@ func CreateOrder(ctx context.Context, body CreateOrderBody) (*db.Order, error) { if params.Type == db.OrderTypeMARKET { params.Status = db.OrderStatusFILLED _ = params.FilledAt.Scan(time.Now()) + } else { + params.Status = db.OrderStatusPENDING } order, err := db.DB.InsertOrder(ctx, params)