Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5c601fe
Use HTTP instead of WebSocket for Ogmios
marcusbfs Feb 5, 2025
14a6d9d
Remove mempool methods from `QueryM.Ogmios`
marcusbfs Feb 5, 2025
12896f5
Fix warning
marcusbfs Feb 5, 2025
219f497
Use HTTP version of `evaluateTx`
marcusbfs Feb 5, 2025
141a59b
Extract Ogmios types into separate module
marcusbfs Feb 5, 2025
83b2c49
Clean and simplify Ogmios related code
marcusbfs Feb 5, 2025
5f963f1
Move mempool related types to `Ogmios.Mempool`
marcusbfs Feb 5, 2025
5c2d73f
Fix IPV6 parser
marcusbfs Feb 5, 2025
2c0c04d
Move `DecodeOgmios` to `Ogmios.Types`
marcusbfs Feb 5, 2025
af6d6f5
Rename `Ogmios.Queries` to `Ogmios.QueryEnv`
marcusbfs Feb 5, 2025
fb55251
Remove `uniqueId` and `ServerConfig` dependencies from Ogmios.Mempool
marcusbfs Feb 5, 2025
ea54320
Extract common configuration out of `Ogmios.Mempool`
marcusbfs Feb 5, 2025
2dcaa81
Remove unused exports
marcusbfs Feb 6, 2025
2d2196f
Omit "id" field for Ogmios HTTP request/response
marcusbfs Feb 7, 2025
2c8db1f
Simplify error handling
marcusbfs Feb 7, 2025
796868f
Reuse `aesonObject`
marcusbfs Feb 7, 2025
9a214a8
Simplify error handling
marcusbfs Feb 10, 2025
f8fea7c
Add `HttpUtils`
marcusbfs Feb 10, 2025
adba5b1
Clean duplicated code
marcusbfs Feb 10, 2025
febc66a
Remove websocket runtime from `QueryEnv`
marcusbfs Feb 13, 2025
06a5955
Refactor mempool code structure
marcusbfs Feb 13, 2025
99b98a0
Remove `resendPendingSubmitRequests` in websocket internals
marcusbfs Feb 13, 2025
946c443
Remove internal CTL helper functions from mempool
marcusbfs Feb 14, 2025
2718d8a
Remove unused code
marcusbfs Feb 14, 2025
303e7f4
Refactor `OgmiosDecodeError`
marcusbfs Feb 18, 2025
ace529e
Improve HTTP handler readability
marcusbfs Feb 18, 2025
d8ba070
Update changelog
marcusbfs Feb 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/Contract/Backend/Ogmios.purs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import Cardano.Types.CborBytes (CborBytes)
import Cardano.Types.TransactionHash (TransactionHash)
import Contract.Monad (Contract)
import Ctl.Internal.Contract.Monad (wrapQueryM)
import Ctl.Internal.QueryM (submitTxOgmios) as QueryM
import Ctl.Internal.QueryM.Ogmios (SubmitTxR)
import Ctl.Internal.QueryM.Ogmios (submitTxOgmios) as Ogmios
import Ctl.Internal.QueryM.Ogmios.Types (SubmitTxR)
import Ctl.Internal.QueryM.Pools (getPoolParameters) as QueryM

-- | **This function can only run with Ogmios backend**
Expand All @@ -26,4 +26,4 @@ getPoolParameters = wrapQueryM <<< QueryM.getPoolParameters

-- | Error returning variant
submitTxE :: TransactionHash -> CborBytes -> Contract SubmitTxR
submitTxE txhash cbor = wrapQueryM $ QueryM.submitTxOgmios txhash cbor
submitTxE txhash cbor = wrapQueryM $ Ogmios.submitTxOgmios txhash cbor
204 changes: 151 additions & 53 deletions src/Contract/Backend/Ogmios/Mempool.purs
Original file line number Diff line number Diff line change
Expand Up @@ -2,86 +2,102 @@
-- | These functions only work with Ogmios backend (not Blockfrost!).
-- | https://ogmios.dev/mini-protocols/local-tx-monitor/
module Contract.Backend.Ogmios.Mempool
( module Ogmios
, acquireMempoolSnapshot
, fetchMempoolTxs
( acquireMempoolSnapshot
, mempoolSnapshotHasTx
, mempoolSnapshotNextTx
, fetchMempoolTxs
, mempoolSnapshotSizeAndCapacity
, releaseMempool
, withMempoolSnapshot
, MempoolEnv
, MempoolMT(MempoolMT)
, MempoolM
) where

import Contract.Prelude

import Cardano.AsCbor (decodeCbor)
import Cardano.Types.Transaction (Transaction)
import Cardano.Types.TransactionHash (TransactionHash)
import Contract.Monad (Contract)
import Control.Monad.Error.Class (liftMaybe, try)
import Ctl.Internal.Contract.Monad (wrapQueryM)
import Ctl.Internal.QueryM
( acquireMempoolSnapshot
, mempoolSnapshotHasTx
, mempoolSnapshotNextTx
, mempoolSnapshotSizeAndCapacity
, releaseMempool
) as QueryM
import Ctl.Internal.QueryM.Ogmios
( MempoolSizeAndCapacity(MempoolSizeAndCapacity)
import Control.Monad.Error.Class
( class MonadError
, class MonadThrow
, liftMaybe
, try
)
import Control.Monad.Reader.Class (class MonadAsk)
import Control.Monad.Reader.Trans (ReaderT(ReaderT), asks)
import Ctl.Internal.Logging (Logger, mkLogger)
import Ctl.Internal.QueryM.Ogmios.Mempool
( ListenerSet
, OgmiosListeners
, OgmiosWebSocket
, acquireMempoolSnapshotCall
, listeners
, mempoolSnapshotHasTxCall
, mempoolSnapshotNextTxCall
, mempoolSnapshotSizeAndCapacityCall
, mkRequestAff
, releaseMempoolCall
, underlyingWebSocket
)
import Ctl.Internal.QueryM.Ogmios.Mempool
( MempoolSizeAndCapacity
, MempoolSnapshotAcquired
, MempoolTransaction(MempoolTransaction)
) as Ogmios
import Ctl.Internal.QueryM.Ogmios.Mempool.JsWebSocket (JsWebSocket)
import Ctl.Internal.QueryM.Ogmios.Mempool.JsonRpc2 as JsonRpc2
import Data.Array as Array
import Data.ByteArray (hexToByteArray)
import Data.List (List(Cons))
import Data.Maybe (Maybe(Just, Nothing))
import Effect.Exception (error)
import Data.Log.Level (LogLevel)
import Data.Log.Message (Message)
import Data.Maybe (Maybe)
import Data.Newtype (class Newtype, unwrap)
import Effect.Aff (Aff)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (class MonadEffect)
import Effect.Exception (Error, error)

-- | Establish a connection with the Local TX Monitor.
-- | Instantly accquires the current mempool snapshot, and will wait for the next
-- | mempool snapshot if used again before using `releaseMempool`.
acquireMempoolSnapshot :: Contract Ogmios.MempoolSnapshotAcquired
acquireMempoolSnapshot = wrapQueryM QueryM.acquireMempoolSnapshot
----------------
-- Mempool monad
----------------

-- | Check to see if a TxHash is present in the current mempool snapshot.
mempoolSnapshotHasTx
:: Ogmios.MempoolSnapshotAcquired -> TransactionHash -> Contract Boolean
mempoolSnapshotHasTx ms = wrapQueryM <<< QueryM.mempoolSnapshotHasTx ms
type MempoolEnv =
{ ogmiosWs :: OgmiosWebSocket
, logLevel :: LogLevel
, customLogger :: Maybe (LogLevel -> Message -> Aff Unit)
, suppressLogs :: Boolean
}

-- | Get the first received TX in the current mempool snapshot. This function can
-- | be recursively called to traverse the finger-tree of the mempool data set.
-- | This will return `Nothing` once it has reached the end of the current mempool.
mempoolSnapshotNextTx
:: Ogmios.MempoolSnapshotAcquired
-> Contract (Maybe Transaction)
mempoolSnapshotNextTx mempoolAcquired = do
mbTx <- wrapQueryM $ QueryM.mempoolSnapshotNextTx mempoolAcquired
for mbTx \(Ogmios.MempoolTransaction { raw }) -> do
byteArray <- liftMaybe (error "Failed to decode transaction")
$ hexToByteArray raw
liftMaybe (error "Failed to decode tx")
$ decodeCbor
$ wrap byteArray
type MempoolM = MempoolMT Aff

-- | The acquired snapshot’s size (in bytes), number of transactions, and
-- | capacity (in bytes).
mempoolSnapshotSizeAndCapacity
:: Ogmios.MempoolSnapshotAcquired -> Contract Ogmios.MempoolSizeAndCapacity
mempoolSnapshotSizeAndCapacity = wrapQueryM <<<
QueryM.mempoolSnapshotSizeAndCapacity
newtype MempoolMT (m :: Type -> Type) (a :: Type) =
MempoolMT (ReaderT MempoolEnv m a)

-- | Release the connection to the Local TX Monitor.
releaseMempool
:: Ogmios.MempoolSnapshotAcquired -> Contract Unit
releaseMempool = wrapQueryM <<< QueryM.releaseMempool
derive instance Newtype (MempoolMT m a) _
derive newtype instance Functor m => Functor (MempoolMT m)
derive newtype instance Apply m => Apply (MempoolMT m)
derive newtype instance Applicative m => Applicative (MempoolMT m)
derive newtype instance Bind m => Bind (MempoolMT m)
derive newtype instance Monad (MempoolMT Aff)
derive newtype instance MonadEffect (MempoolMT Aff)
derive newtype instance MonadAff (MempoolMT Aff)
derive newtype instance MonadThrow Error (MempoolMT Aff)
derive newtype instance MonadError Error (MempoolMT Aff)
derive newtype instance MonadAsk MempoolEnv (MempoolMT Aff)

--------------------
-- Mempool functions
--------------------

-- | A bracket-style function for working with mempool snapshots - ensures
-- | release in the presence of exceptions
withMempoolSnapshot
:: forall a
. (Ogmios.MempoolSnapshotAcquired -> Contract a)
-> Contract a
. (Ogmios.MempoolSnapshotAcquired -> MempoolM a)
-> MempoolM a
withMempoolSnapshot f = do
s <- acquireMempoolSnapshot
res <- try $ f s
Expand All @@ -92,11 +108,93 @@ withMempoolSnapshot f = do
-- | respond with a new TX.
fetchMempoolTxs
:: Ogmios.MempoolSnapshotAcquired
-> Contract (Array Transaction)
-> MempoolM (Array Transaction)
fetchMempoolTxs ms = Array.fromFoldable <$> go
where
go = do
nextTX <- mempoolSnapshotNextTx ms
case nextTX of
Just tx -> Cons tx <$> go
Nothing -> pure mempty

acquireMempoolSnapshot
:: MempoolM Ogmios.MempoolSnapshotAcquired
acquireMempoolSnapshot =
mkOgmiosRequest
acquireMempoolSnapshotCall
_.acquireMempool
unit

mempoolSnapshotHasTx
:: Ogmios.MempoolSnapshotAcquired
-> TransactionHash
-> MempoolM Boolean
mempoolSnapshotHasTx ms txh =
unwrap <$> mkOgmiosRequest
(mempoolSnapshotHasTxCall ms)
_.mempoolHasTx
txh

mempoolSnapshotSizeAndCapacity
:: Ogmios.MempoolSnapshotAcquired
-> MempoolM Ogmios.MempoolSizeAndCapacity
mempoolSnapshotSizeAndCapacity ms =
mkOgmiosRequest
(mempoolSnapshotSizeAndCapacityCall ms)
_.mempoolSizeAndCapacity
unit

releaseMempool
:: Ogmios.MempoolSnapshotAcquired
-> MempoolM Unit
releaseMempool ms =
unit <$ mkOgmiosRequest
(releaseMempoolCall ms)
_.releaseMempool
unit

mempoolSnapshotNextTx
:: Ogmios.MempoolSnapshotAcquired
-> MempoolM (Maybe Transaction)
mempoolSnapshotNextTx ms = do
mbTx <- unwrap <$> mkOgmiosRequest
(mempoolSnapshotNextTxCall ms)
_.mempoolNextTx
unit
for mbTx \(Ogmios.MempoolTransaction { raw }) -> do
byteArray <- liftMaybe (error "Failed to decode transaction")
$ hexToByteArray raw
liftMaybe (error "Failed to decode tx")
$ decodeCbor
$ wrap byteArray

-- | Builds an Ogmios request action using `MempoolM`
mkOgmiosRequest
:: forall (request :: Type) (response :: Type)
. JsonRpc2.JsonRpc2Call request response
-> (OgmiosListeners -> ListenerSet request response)
-> request
-> MempoolM response
mkOgmiosRequest jsonRpc2Call getLs inp = do
listeners' <- asks $ listeners <<< _.ogmiosWs
websocket <- asks $ underlyingWebSocket <<< _.ogmiosWs
mkRequest listeners' websocket jsonRpc2Call getLs inp

mkRequest
:: forall (request :: Type) (response :: Type) (listeners :: Type)
. listeners
-> JsWebSocket
-> JsonRpc2.JsonRpc2Call request response
-> (listeners -> ListenerSet request response)
-> request
-> MempoolM response
mkRequest listeners' ws jsonRpc2Call getLs inp = do
logger <- getLogger
liftAff $ mkRequestAff listeners' ws logger jsonRpc2Call getLs inp
where
getLogger :: MempoolM Logger
getLogger = do
logLevel <- asks $ _.logLevel
mbCustomLogger <- asks $ _.customLogger
pure $ mkLogger logLevel mbCustomLogger

2 changes: 1 addition & 1 deletion src/Contract/Time.purs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import Control.Monad.Reader.Class (asks)
import Ctl.Internal.Contract (getChainTip)
import Ctl.Internal.Contract.Monad (getProvider)
import Ctl.Internal.Helpers (liftM)
import Ctl.Internal.QueryM.Ogmios
import Ctl.Internal.QueryM.Ogmios.Types
( CurrentEpoch(CurrentEpoch)
, OgmiosEraSummaries(OgmiosEraSummaries)
) as ExportOgmios
Expand Down
2 changes: 1 addition & 1 deletion src/Internal/BalanceTx/ExUnitsAndMinFee.purs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import Ctl.Internal.BalanceTx.Types
import Ctl.Internal.Contract.MinFee (calculateMinFee) as Contract.MinFee
import Ctl.Internal.Contract.Monad (getProvider)
import Ctl.Internal.Helpers (liftEither, unsafeFromJust)
import Ctl.Internal.QueryM.Ogmios (AdditionalUtxoSet) as Ogmios
import Ctl.Internal.QueryM.Ogmios.Types (AdditionalUtxoSet) as Ogmios
import Ctl.Internal.Transaction (setScriptDataHash)
import Ctl.Internal.TxOutput
( transactionInputToTxOutRef
Expand Down
Loading