Skip to content

feat: store order #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ migrate-up:

migrate-down:
migrate -path pkg/db/migration -database "$(POSTGRES_URL)" -verbose down

new-migration:
migrate create -ext sql -dir pkg/db/migration -seq $(name)

Expand Down
2 changes: 1 addition & 1 deletion app.compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ services:
volumes:
- dexon_openobserve_data:/data
api:
image: dexon-api
image: dexon-api
container_name: dexon-api
build:
context: .
Expand Down
2 changes: 2 additions & 0 deletions cmd/api/server/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/labstack/echo/v4"
"github.com/zuni-lab/dexon-service/internal/chat"
"github.com/zuni-lab/dexon-service/internal/health"
"github.com/zuni-lab/dexon-service/internal/orders"
"github.com/zuni-lab/dexon-service/internal/pools"
)

Expand All @@ -12,4 +13,5 @@ func setupRoute(e *echo.Echo) {
health.Route(e, "/health")
pools.Route(api, "/pools")
chat.Route(api, "/chat")
orders.Route(api, "/orders")
}
210 changes: 210 additions & 0 deletions cmd/worker/job/price_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package job

import (
"context"
"fmt"
"github.com/zuni-lab/dexon-service/internal/orders/services"
"github.com/zuni-lab/dexon-service/pkg/utils"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/rs/zerolog/log"
"github.com/zuni-lab/dexon-service/config"
"github.com/zuni-lab/dexon-service/pkg/evm"
)

type PriceTracker struct {
client *ethclient.Client
backoff *backoff.ExponentialBackOff
maxAttempts uint64
}

func NewPriceTracker() *PriceTracker {
b := backoff.NewExponentialBackOff()
b.InitialInterval = 1 * time.Second
b.MaxInterval = 1 * time.Minute
b.MaxElapsedTime = 30 * time.Minute

return &PriceTracker{
backoff: b,
maxAttempts: 100,
}
}

func (p *PriceTracker) Start(ctx context.Context) error {
for {
if err := p.connect(); err != nil {
log.Error().Err(err).Msg("Failed to connect to Ethereum client")
continue
}

if err := p.WatchPools(ctx); err != nil {
log.Error().Err(err).Msg("Error watching pools")
if p.client != nil {
p.client.Close()
p.client = nil
}
continue
}

return nil
}
}

func (p *PriceTracker) connect() error {
url := strings.Replace(config.Env.AlchemyUrl, "https", "wss", 1)

var client *ethclient.Client
var err error

operation := func() error {
log.Info().Msg("Attempting to connect to Ethereum client...")
client, err = ethclient.Dial(url)
if err != nil {
return fmt.Errorf("failed to connect: %w", err)
}
return nil
}

if err := backoff.Retry(operation, p.backoff); err != nil {
return err
}

p.client = client
return nil
}

func (p *PriceTracker) WatchPools(ctx context.Context) error {
pools := []common.Address{
common.HexToAddress("0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640"),
common.HexToAddress("0xc7bbec68d12a0d1830360f8ec58fa599ba1b0e9b"),
}

errChan := make(chan error, len(pools))
var wg sync.WaitGroup

for _, pool := range pools {
wg.Add(1)
go func(pool common.Address) {
defer wg.Done()
if err := p.WatchPool(ctx, pool); err != nil {
select {
case errChan <- fmt.Errorf("pool %s: %w", pool.Hex(), err):
default:
}
}
}(pool)
}

go func() {
wg.Wait()
close(errChan)
}()

select {
case <-ctx.Done():
return ctx.Err()
case err := <-errChan:
return err
}
}

func (p *PriceTracker) WatchPool(ctx context.Context, pool common.Address) error {
for {
if err := p.watchPoolWithRetry(ctx, pool); err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
log.Error().
Err(err).
Str("pool", pool.Hex()).
Msg("Error watching pool, will retry")
continue
}
return nil
}
}

func (p *PriceTracker) watchPoolWithRetry(ctx context.Context, pool common.Address) error {
log.Info().Msgf("Watching pool %s", pool.Hex())
attempt := 0
RETRY:
for attempt < int(p.maxAttempts) {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

contract, err := evm.NewUniswapV3(pool, p.client)
if err != nil {
return fmt.Errorf("failed to create contract instance: %w", err)
}

watchOpts := &bind.WatchOpts{Context: ctx}
sink := make(chan *evm.UniswapV3Swap)

sub, err := contract.WatchSwap(watchOpts, sink, nil, nil)
if err != nil {
attempt++
nextBackoff := p.backoff.NextBackOff()
if nextBackoff == backoff.Stop {
return fmt.Errorf("max elapsed time reached after %d attempts", attempt)
}
log.Warn().
Err(err).
Int("attempt", attempt).
Dur("backoff", nextBackoff).
Str("pool", pool.Hex()).
Msg("Failed to watch swaps, retrying...")
time.Sleep(nextBackoff)
continue
}
defer sub.Unsubscribe()

attempt = 0
p.backoff.Reset()

log.Info().Msg("🚀 Ready to watch pool")

for {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-sub.Err():
if err != nil {
log.Warn().
Err(err).
Str("pool", pool.Hex()).
Msg("Subscription error, reconnecting...")
goto RETRY
}
case event := <-sink:
if err := p.handleEvent(event); err != nil {
log.Error().
Err(err).
Str("pool", pool.Hex()).
Msg("Error handling event")
}
}
}
}

return fmt.Errorf("failed to maintain subscription after %d attempts", p.maxAttempts)
}

func (p *PriceTracker) handleEvent(event *evm.UniswapV3Swap) error {
price := utils.CalculatePrice(nil, 0, 0, false)
_, err := services.MatchOrder(context.Background(), price.String())
if err != nil {
log.Info().Any("event", event).Err(err).Msgf("[PriceTracker] [HandleEvent] failed to match order")
}

log.Info().Any("event", event).Msgf("[PriceTracker] [HandleEvent] handled %s event", event.Raw.Address.Hex())
return nil
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ require (
github.com/ethereum/go-ethereum v1.15.2
github.com/go-playground/validator/v10 v10.24.0
github.com/golang-migrate/migrate/v4 v4.18.2
github.com/google/btree v1.1.3
github.com/jackc/pgx/v5 v5.7.2
github.com/jedib0t/go-pretty/v6 v6.6.6
github.com/jinzhu/copier v0.4.0
github.com/joho/godotenv v1.5.1
github.com/labstack/echo/v4 v4.13.3
github.com/openai/openai-go v0.1.0-alpha.59
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb h1:PBC98N2aIaM3XXiurYmW7fx4GZkL8feAMVq7nEjURHk=
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg=
github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
Expand Down Expand Up @@ -141,6 +143,8 @@ github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7Bd
github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jedib0t/go-pretty/v6 v6.6.6 h1:LyezkL+1SuqH2z47e5IMQkYUIcs2BD+MnpdPRiRcN0c=
github.com/jedib0t/go-pretty/v6 v6.6.6/go.mod h1:YwC5CE4fJ1HFUDeivSV1r//AmANFHyqczZk+U6BDALU=
github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8=
github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4=
Expand Down
25 changes: 25 additions & 0 deletions internal/orders/handlers/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package handlers

import (
"github.com/zuni-lab/dexon-service/internal/orders/services"
"net/http"

"github.com/labstack/echo/v4"
"github.com/zuni-lab/dexon-service/pkg/utils"
)

func Create(c echo.Context) error {
ctx := c.Request().Context()

var body services.CreateOrderBody
if err := utils.BindAndValidate(c, &body); err != nil {
return err
}

order, err := services.CreateOrder(ctx, body)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err)
}

return c.JSON(http.StatusOK, order)
}
25 changes: 25 additions & 0 deletions internal/orders/handlers/list.go.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package handlers

import (
"github.com/zuni-lab/dexon-service/internal/orders/services"
"net/http"

"github.com/labstack/echo/v4"
"github.com/zuni-lab/dexon-service/pkg/utils"
)

func List(c echo.Context) error {
ctx := c.Request().Context()

var query services.ListOrdersByWalletQuery
if err := utils.BindAndValidate(c, &query); err != nil {
return err
}

orders, err := services.ListOrderByWallet(ctx, query)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err)
}

return c.JSON(http.StatusOK, orders)
}
22 changes: 22 additions & 0 deletions internal/orders/route.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package orders

import (
"github.com/labstack/echo/v4"
"github.com/zuni-lab/dexon-service/internal/orders/handlers"
)

func Route(g *echo.Group, path string) {
// TODO: add middleware here

middleware := echo.MiddlewareFunc(func(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
// TODO: add middleware here
return next(c)
}
})

ordersGroup := g.Group(path, middleware)

ordersGroup.GET("", handlers.List)
ordersGroup.POST("", handlers.Create)
}
Loading
Loading