22-- | These functions only work with Ogmios backend (not Blockfrost!).
33-- | https://ogmios.dev/mini-protocols/local-tx-monitor/
44module Contract.Backend.Ogmios.Mempool
5- ( module Ogmios
6- , acquireMempoolSnapshot
7- , fetchMempoolTxs
5+ ( acquireMempoolSnapshot
86 , mempoolSnapshotHasTx
97 , mempoolSnapshotNextTx
8+ , fetchMempoolTxs
109 , mempoolSnapshotSizeAndCapacity
1110 , releaseMempool
1211 , withMempoolSnapshot
12+ , MempoolEnv
13+ , MempoolMT (MempoolMT)
14+ , MempoolM
1315 ) where
1416
1517import Contract.Prelude
1618
1719import Cardano.AsCbor (decodeCbor )
1820import Cardano.Types.Transaction (Transaction )
1921import Cardano.Types.TransactionHash (TransactionHash )
20- import Contract.Monad (Contract )
21- import Control.Monad.Error.Class (liftMaybe , try )
22- import Ctl.Internal.Contract.Monad (wrapQueryM )
23- import Ctl.Internal.QueryM
24- ( acquireMempoolSnapshot
25- , mempoolSnapshotHasTx
26- , mempoolSnapshotNextTx
27- , mempoolSnapshotSizeAndCapacity
28- , releaseMempool
29- ) as QueryM
30- import Ctl.Internal.QueryM.Ogmios
31- ( MempoolSizeAndCapacity (MempoolSizeAndCapacity)
22+ import Control.Monad.Error.Class
23+ ( class MonadError
24+ , class MonadThrow
25+ , liftMaybe
26+ , try
27+ )
28+ import Control.Monad.Reader.Class (class MonadAsk )
29+ import Control.Monad.Reader.Trans (ReaderT (ReaderT), asks )
30+ import Ctl.Internal.Logging (Logger , mkLogger )
31+ import Ctl.Internal.QueryM.Ogmios.Mempool
32+ ( ListenerSet
33+ , OgmiosListeners
34+ , OgmiosWebSocket
35+ , acquireMempoolSnapshotCall
36+ , listeners
37+ , mempoolSnapshotHasTxCall
38+ , mempoolSnapshotNextTxCall
39+ , mempoolSnapshotSizeAndCapacityCall
40+ , mkRequestAff
41+ , releaseMempoolCall
42+ , underlyingWebSocket
43+ )
44+ import Ctl.Internal.QueryM.Ogmios.Mempool
45+ ( MempoolSizeAndCapacity
3246 , MempoolSnapshotAcquired
3347 , MempoolTransaction (MempoolTransaction)
3448 ) as Ogmios
49+ import Ctl.Internal.QueryM.Ogmios.Mempool.JsWebSocket (JsWebSocket )
50+ import Ctl.Internal.QueryM.Ogmios.Mempool.JsonRpc2 as JsonRpc2
3551import Data.Array as Array
3652import Data.ByteArray (hexToByteArray )
3753import Data.List (List (Cons))
38- import Data.Maybe (Maybe (Just, Nothing))
39- import Effect.Exception (error )
54+ import Data.Log.Level (LogLevel )
55+ import Data.Log.Message (Message )
56+ import Data.Maybe (Maybe )
57+ import Data.Newtype (class Newtype , unwrap )
58+ import Effect.Aff (Aff )
59+ import Effect.Aff.Class (class MonadAff , liftAff )
60+ import Effect.Class (class MonadEffect )
61+ import Effect.Exception (Error , error )
4062
41- -- | Establish a connection with the Local TX Monitor.
42- -- | Instantly accquires the current mempool snapshot, and will wait for the next
43- -- | mempool snapshot if used again before using `releaseMempool`.
44- acquireMempoolSnapshot :: Contract Ogmios.MempoolSnapshotAcquired
45- acquireMempoolSnapshot = wrapQueryM QueryM .acquireMempoolSnapshot
63+ -- --------------
64+ -- Mempool monad
65+ -- --------------
4666
47- -- | Check to see if a TxHash is present in the current mempool snapshot.
48- mempoolSnapshotHasTx
49- :: Ogmios.MempoolSnapshotAcquired -> TransactionHash -> Contract Boolean
50- mempoolSnapshotHasTx ms = wrapQueryM <<< QueryM .mempoolSnapshotHasTx ms
67+ type MempoolEnv =
68+ { ogmiosWs :: OgmiosWebSocket
69+ , logLevel :: LogLevel
70+ , customLogger :: Maybe (LogLevel -> Message -> Aff Unit )
71+ , suppressLogs :: Boolean
72+ }
5173
52- -- | Get the first received TX in the current mempool snapshot. This function can
53- -- | be recursively called to traverse the finger-tree of the mempool data set.
54- -- | This will return `Nothing` once it has reached the end of the current mempool.
55- mempoolSnapshotNextTx
56- :: Ogmios.MempoolSnapshotAcquired
57- -> Contract (Maybe Transaction )
58- mempoolSnapshotNextTx mempoolAcquired = do
59- mbTx <- wrapQueryM $ QueryM .mempoolSnapshotNextTx mempoolAcquired
60- for mbTx \(Ogmios.MempoolTransaction { raw }) -> do
61- byteArray <- liftMaybe (error " Failed to decode transaction" )
62- $ hexToByteArray raw
63- liftMaybe (error " Failed to decode tx" )
64- $ decodeCbor
65- $ wrap byteArray
74+ type MempoolM = MempoolMT Aff
6675
67- -- | The acquired snapshot’s size (in bytes), number of transactions, and
68- -- | capacity (in bytes).
69- mempoolSnapshotSizeAndCapacity
70- :: Ogmios.MempoolSnapshotAcquired -> Contract Ogmios.MempoolSizeAndCapacity
71- mempoolSnapshotSizeAndCapacity = wrapQueryM <<<
72- QueryM .mempoolSnapshotSizeAndCapacity
76+ newtype MempoolMT (m :: Type -> Type ) (a :: Type ) =
77+ MempoolMT (ReaderT MempoolEnv m a )
7378
74- -- | Release the connection to the Local TX Monitor.
75- releaseMempool
76- :: Ogmios.MempoolSnapshotAcquired -> Contract Unit
77- releaseMempool = wrapQueryM <<< QueryM .releaseMempool
79+ derive instance Newtype (MempoolMT m a ) _
80+ derive newtype instance Functor m => Functor (MempoolMT m )
81+ derive newtype instance Apply m => Apply (MempoolMT m )
82+ derive newtype instance Applicative m => Applicative (MempoolMT m )
83+ derive newtype instance Bind m => Bind (MempoolMT m )
84+ derive newtype instance Monad (MempoolMT Aff )
85+ derive newtype instance MonadEffect (MempoolMT Aff )
86+ derive newtype instance MonadAff (MempoolMT Aff )
87+ derive newtype instance MonadThrow Error (MempoolMT Aff )
88+ derive newtype instance MonadError Error (MempoolMT Aff )
89+ derive newtype instance MonadAsk MempoolEnv (MempoolMT Aff )
90+
91+ -- ------------------
92+ -- Mempool functions
93+ -- ------------------
7894
7995-- | A bracket-style function for working with mempool snapshots - ensures
8096-- | release in the presence of exceptions
8197withMempoolSnapshot
8298 :: forall a
83- . (Ogmios.MempoolSnapshotAcquired -> Contract a )
84- -> Contract a
99+ . (Ogmios.MempoolSnapshotAcquired -> MempoolM a )
100+ -> MempoolM a
85101withMempoolSnapshot f = do
86102 s <- acquireMempoolSnapshot
87103 res <- try $ f s
@@ -92,11 +108,93 @@ withMempoolSnapshot f = do
92108-- | respond with a new TX.
93109fetchMempoolTxs
94110 :: Ogmios.MempoolSnapshotAcquired
95- -> Contract (Array Transaction )
111+ -> MempoolM (Array Transaction )
96112fetchMempoolTxs ms = Array .fromFoldable <$> go
97113 where
98114 go = do
99115 nextTX <- mempoolSnapshotNextTx ms
100116 case nextTX of
101117 Just tx -> Cons tx <$> go
102118 Nothing -> pure mempty
119+
120+ acquireMempoolSnapshot
121+ :: MempoolM Ogmios.MempoolSnapshotAcquired
122+ acquireMempoolSnapshot =
123+ mkOgmiosRequest
124+ acquireMempoolSnapshotCall
125+ _.acquireMempool
126+ unit
127+
128+ mempoolSnapshotHasTx
129+ :: Ogmios.MempoolSnapshotAcquired
130+ -> TransactionHash
131+ -> MempoolM Boolean
132+ mempoolSnapshotHasTx ms txh =
133+ unwrap <$> mkOgmiosRequest
134+ (mempoolSnapshotHasTxCall ms)
135+ _.mempoolHasTx
136+ txh
137+
138+ mempoolSnapshotSizeAndCapacity
139+ :: Ogmios.MempoolSnapshotAcquired
140+ -> MempoolM Ogmios.MempoolSizeAndCapacity
141+ mempoolSnapshotSizeAndCapacity ms =
142+ mkOgmiosRequest
143+ (mempoolSnapshotSizeAndCapacityCall ms)
144+ _.mempoolSizeAndCapacity
145+ unit
146+
147+ releaseMempool
148+ :: Ogmios.MempoolSnapshotAcquired
149+ -> MempoolM Unit
150+ releaseMempool ms =
151+ unit <$ mkOgmiosRequest
152+ (releaseMempoolCall ms)
153+ _.releaseMempool
154+ unit
155+
156+ mempoolSnapshotNextTx
157+ :: Ogmios.MempoolSnapshotAcquired
158+ -> MempoolM (Maybe Transaction )
159+ mempoolSnapshotNextTx ms = do
160+ mbTx <- unwrap <$> mkOgmiosRequest
161+ (mempoolSnapshotNextTxCall ms)
162+ _.mempoolNextTx
163+ unit
164+ for mbTx \(Ogmios.MempoolTransaction { raw }) -> do
165+ byteArray <- liftMaybe (error " Failed to decode transaction" )
166+ $ hexToByteArray raw
167+ liftMaybe (error " Failed to decode tx" )
168+ $ decodeCbor
169+ $ wrap byteArray
170+
171+ -- | Builds an Ogmios request action using `MempoolM`
172+ mkOgmiosRequest
173+ :: forall (request :: Type ) (response :: Type )
174+ . JsonRpc2.JsonRpc2Call request response
175+ -> (OgmiosListeners -> ListenerSet request response )
176+ -> request
177+ -> MempoolM response
178+ mkOgmiosRequest jsonRpc2Call getLs inp = do
179+ listeners' <- asks $ listeners <<< _.ogmiosWs
180+ websocket <- asks $ underlyingWebSocket <<< _.ogmiosWs
181+ mkRequest listeners' websocket jsonRpc2Call getLs inp
182+
183+ mkRequest
184+ :: forall (request :: Type ) (response :: Type ) (listeners :: Type )
185+ . listeners
186+ -> JsWebSocket
187+ -> JsonRpc2.JsonRpc2Call request response
188+ -> (listeners -> ListenerSet request response )
189+ -> request
190+ -> MempoolM response
191+ mkRequest listeners' ws jsonRpc2Call getLs inp = do
192+ logger <- getLogger
193+ liftAff $ mkRequestAff listeners' ws logger jsonRpc2Call getLs inp
194+ where
195+ getLogger :: MempoolM Logger
196+ getLogger = do
197+ logLevel <- asks $ _.logLevel
198+ mbCustomLogger <- asks $ _.customLogger
199+ pure $ mkLogger logLevel mbCustomLogger
200+
0 commit comments