Skip to content

Commit 5c601fe

Browse files
committed
Use HTTP instead of WebSocket for Ogmios
1 parent 53f42ed commit 5c601fe

27 files changed

+2164
-964
lines changed

src/Contract/Backend/Ogmios.purs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ import Cardano.Types.CborBytes (CborBytes)
1111
import Cardano.Types.TransactionHash (TransactionHash)
1212
import Contract.Monad (Contract)
1313
import Ctl.Internal.Contract.Monad (wrapQueryM)
14-
import Ctl.Internal.QueryM (submitTxOgmios) as QueryM
1514
import Ctl.Internal.QueryM.Ogmios (SubmitTxR)
15+
import Ctl.Internal.QueryM.OgmiosHttp (submitTxOgmios) as OgmiosHttp
1616
import Ctl.Internal.QueryM.Pools (getPoolParameters) as QueryM
1717

1818
-- | **This function can only run with Ogmios backend**
@@ -26,4 +26,4 @@ getPoolParameters = wrapQueryM <<< QueryM.getPoolParameters
2626

2727
-- | Error returning variant
2828
submitTxE :: TransactionHash -> CborBytes -> Contract SubmitTxR
29-
submitTxE txhash cbor = wrapQueryM $ QueryM.submitTxOgmios txhash cbor
29+
submitTxE txhash cbor = wrapQueryM $ OgmiosHttp.submitTxOgmios txhash cbor

src/Contract/Backend/Ogmios/Mempool.purs

Lines changed: 104 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22
-- | These functions only work with Ogmios backend (not Blockfrost!).
33
-- | https://ogmios.dev/mini-protocols/local-tx-monitor/
44
module Contract.Backend.Ogmios.Mempool
5-
( module Ogmios
6-
, acquireMempoolSnapshot
5+
( acquireMempoolSnapshot
76
, fetchMempoolTxs
87
, mempoolSnapshotHasTx
98
, mempoolSnapshotNextTx
@@ -19,35 +18,49 @@ import Cardano.Types.Transaction (Transaction)
1918
import Cardano.Types.TransactionHash (TransactionHash)
2019
import Contract.Monad (Contract)
2120
import Control.Monad.Error.Class (liftMaybe, try)
21+
import Control.Monad.Reader.Trans (asks)
2222
import Ctl.Internal.Contract.Monad (wrapQueryM)
23-
import Ctl.Internal.QueryM
24-
( acquireMempoolSnapshot
25-
, mempoolSnapshotHasTx
26-
, mempoolSnapshotNextTx
27-
, mempoolSnapshotSizeAndCapacity
28-
, releaseMempool
29-
) as QueryM
23+
import Ctl.Internal.Logging (Logger, mkLogger)
24+
import Ctl.Internal.QueryM (QueryM)
25+
import Ctl.Internal.QueryM.JsonRpc2 as JsonRpc2
3026
import Ctl.Internal.QueryM.Ogmios
31-
( MempoolSizeAndCapacity(MempoolSizeAndCapacity)
27+
( MempoolSizeAndCapacity
3228
, MempoolSnapshotAcquired
3329
, MempoolTransaction(MempoolTransaction)
30+
, acquireMempoolSnapshotCall
3431
) as Ogmios
32+
import Ctl.Internal.QueryM.OgmiosWebsocket.JsWebSocket (JsWebSocket)
33+
import Ctl.Internal.QueryM.OgmiosWebsocket.Mempool
34+
( mempoolSnapshotHasTxCall
35+
, mempoolSnapshotNextTxCall
36+
, mempoolSnapshotSizeAndCapacityCall
37+
, releaseMempoolCall
38+
)
39+
import Ctl.Internal.QueryM.OgmiosWebsocket.Types
40+
( ListenerSet
41+
, OgmiosListeners
42+
, listeners
43+
, mkRequestAff
44+
, underlyingWebSocket
45+
)
3546
import Data.Array as Array
3647
import Data.ByteArray (hexToByteArray)
3748
import Data.List (List(Cons))
3849
import Data.Maybe (Maybe(Just, Nothing))
50+
import Data.Newtype (unwrap)
51+
import Effect.Aff.Class (liftAff)
3952
import Effect.Exception (error)
4053

4154
-- | Establish a connection with the Local TX Monitor.
4255
-- | Instantly accquires the current mempool snapshot, and will wait for the next
4356
-- | mempool snapshot if used again before using `releaseMempool`.
4457
acquireMempoolSnapshot :: Contract Ogmios.MempoolSnapshotAcquired
45-
acquireMempoolSnapshot = wrapQueryM QueryM.acquireMempoolSnapshot
58+
acquireMempoolSnapshot = wrapQueryM acquireMempoolSnapshotFetch
4659

4760
-- | Check to see if a TxHash is present in the current mempool snapshot.
4861
mempoolSnapshotHasTx
4962
:: Ogmios.MempoolSnapshotAcquired -> TransactionHash -> Contract Boolean
50-
mempoolSnapshotHasTx ms = wrapQueryM <<< QueryM.mempoolSnapshotHasTx ms
63+
mempoolSnapshotHasTx ms = wrapQueryM <<< mempoolSnapshotHasTxFetch ms
5164

5265
-- | Get the first received TX in the current mempool snapshot. This function can
5366
-- | be recursively called to traverse the finger-tree of the mempool data set.
@@ -56,7 +69,7 @@ mempoolSnapshotNextTx
5669
:: Ogmios.MempoolSnapshotAcquired
5770
-> Contract (Maybe Transaction)
5871
mempoolSnapshotNextTx mempoolAcquired = do
59-
mbTx <- wrapQueryM $ QueryM.mempoolSnapshotNextTx mempoolAcquired
72+
mbTx <- wrapQueryM $ mempoolSnapshotNextTxFetch mempoolAcquired
6073
for mbTx \(Ogmios.MempoolTransaction { raw }) -> do
6174
byteArray <- liftMaybe (error "Failed to decode transaction")
6275
$ hexToByteArray raw
@@ -69,12 +82,12 @@ mempoolSnapshotNextTx mempoolAcquired = do
6982
mempoolSnapshotSizeAndCapacity
7083
:: Ogmios.MempoolSnapshotAcquired -> Contract Ogmios.MempoolSizeAndCapacity
7184
mempoolSnapshotSizeAndCapacity = wrapQueryM <<<
72-
QueryM.mempoolSnapshotSizeAndCapacity
85+
mempoolSnapshotSizeAndCapacityFetch
7386

7487
-- | Release the connection to the Local TX Monitor.
7588
releaseMempool
7689
:: Ogmios.MempoolSnapshotAcquired -> Contract Unit
77-
releaseMempool = wrapQueryM <<< QueryM.releaseMempool
90+
releaseMempool = wrapQueryM <<< releaseMempoolFetch
7891

7992
-- | A bracket-style function for working with mempool snapshots - ensures
8093
-- | release in the presence of exceptions
@@ -100,3 +113,79 @@ fetchMempoolTxs ms = Array.fromFoldable <$> go
100113
case nextTX of
101114
Just tx -> Cons tx <$> go
102115
Nothing -> pure mempty
116+
117+
acquireMempoolSnapshotFetch
118+
:: QueryM Ogmios.MempoolSnapshotAcquired
119+
acquireMempoolSnapshotFetch =
120+
mkOgmiosRequest
121+
Ogmios.acquireMempoolSnapshotCall
122+
_.acquireMempool
123+
unit
124+
125+
mempoolSnapshotHasTxFetch
126+
:: Ogmios.MempoolSnapshotAcquired
127+
-> TransactionHash
128+
-> QueryM Boolean
129+
mempoolSnapshotHasTxFetch ms txh =
130+
unwrap <$> mkOgmiosRequest
131+
(mempoolSnapshotHasTxCall ms)
132+
_.mempoolHasTx
133+
txh
134+
135+
mempoolSnapshotSizeAndCapacityFetch
136+
:: Ogmios.MempoolSnapshotAcquired
137+
-> QueryM Ogmios.MempoolSizeAndCapacity
138+
mempoolSnapshotSizeAndCapacityFetch ms =
139+
mkOgmiosRequest
140+
(mempoolSnapshotSizeAndCapacityCall ms)
141+
_.mempoolSizeAndCapacity
142+
unit
143+
144+
releaseMempoolFetch
145+
:: Ogmios.MempoolSnapshotAcquired
146+
-> QueryM Unit
147+
releaseMempoolFetch ms =
148+
unit <$ mkOgmiosRequest
149+
(releaseMempoolCall ms)
150+
_.releaseMempool
151+
unit
152+
153+
mempoolSnapshotNextTxFetch
154+
:: Ogmios.MempoolSnapshotAcquired
155+
-> QueryM (Maybe Ogmios.MempoolTransaction)
156+
mempoolSnapshotNextTxFetch ms =
157+
unwrap <$> mkOgmiosRequest
158+
(mempoolSnapshotNextTxCall ms)
159+
_.mempoolNextTx
160+
unit
161+
162+
-- | Builds an Ogmios request action using `QueryM`
163+
mkOgmiosRequest
164+
:: forall (request :: Type) (response :: Type)
165+
. JsonRpc2.JsonRpc2Call request response
166+
-> (OgmiosListeners -> ListenerSet request response)
167+
-> request
168+
-> QueryM response
169+
mkOgmiosRequest jsonRpc2Call getLs inp = do
170+
listeners' <- asks $ listeners <<< _.ogmiosWs <<< _.runtime
171+
websocket <- asks $ underlyingWebSocket <<< _.ogmiosWs <<< _.runtime
172+
mkRequest listeners' websocket jsonRpc2Call getLs inp
173+
174+
mkRequest
175+
:: forall (request :: Type) (response :: Type) (listeners :: Type)
176+
. listeners
177+
-> JsWebSocket
178+
-> JsonRpc2.JsonRpc2Call request response
179+
-> (listeners -> ListenerSet request response)
180+
-> request
181+
-> QueryM response
182+
mkRequest listeners' ws jsonRpc2Call getLs inp = do
183+
logger <- getLogger
184+
liftAff $ mkRequestAff listeners' ws logger jsonRpc2Call getLs inp
185+
where
186+
getLogger :: QueryM Logger
187+
getLogger = do
188+
logLevel <- asks $ _.config >>> _.logLevel
189+
mbCustomLogger <- asks $ _.config >>> _.customLogger
190+
pure $ mkLogger logLevel mbCustomLogger
191+

src/Internal/Contract/Monad.purs

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,18 +54,21 @@ import Ctl.Internal.Contract.ProviderBackend
5454
, getCtlBackend
5555
)
5656
import Ctl.Internal.Helpers (filterMapWithKeyM, liftM, logWithLevel)
57-
import Ctl.Internal.JsWebSocket (_wsClose, _wsFinalize)
5857
import Ctl.Internal.Logging (Logger, mkLogger, setupLogs)
59-
import Ctl.Internal.QueryM
60-
( QueryEnv
61-
, QueryM
62-
, WebSocket
63-
, getProtocolParametersAff
64-
, getSystemStartAff
58+
import Ctl.Internal.QueryM (QueryM)
59+
import Ctl.Internal.QueryM.JsonRpc2 (OgmiosDecodeError, pprintOgmiosDecodeError)
60+
import Ctl.Internal.QueryM.Kupo (isTxConfirmedAff)
61+
import Ctl.Internal.QueryM.OgmiosHttp
62+
( getProtocolParameters
63+
, getSystemStartTime
64+
)
65+
import Ctl.Internal.QueryM.OgmiosWebsocket.JsWebSocket (_wsClose, _wsFinalize)
66+
import Ctl.Internal.QueryM.OgmiosWebsocket.Queries (QueryEnv)
67+
import Ctl.Internal.QueryM.OgmiosWebsocket.Types
68+
( WebSocket
6569
, mkOgmiosWebSocketAff
6670
, underlyingWebSocket
6771
)
68-
import Ctl.Internal.QueryM.Kupo (isTxConfirmedAff)
6972
import Ctl.Internal.Service.Blockfrost
7073
( BlockfrostServiceM
7174
, runBlockfrostServiceM
@@ -290,10 +293,32 @@ getLedgerConstants
290293
-> ProviderBackend
291294
-> Aff LedgerConstants
292295
getLedgerConstants params = case _ of
293-
CtlBackend { ogmios: { ws } } _ ->
294-
{ pparams: _, systemStart: _ }
295-
<$> (unwrap <$> getProtocolParametersAff ws logger)
296-
<*> getSystemStartAff ws logger
296+
CtlBackend ctlBackend _ -> do
297+
let
298+
logParams =
299+
{ logLevel: params.logLevel
300+
, customLogger: params.customLogger
301+
, suppressLogs: true
302+
}
303+
pparams <- unwrap <$>
304+
( runQueryM logParams ctlBackend getProtocolParameters >>=
305+
throwOnLeft
306+
)
307+
systemStart <- unwrap <$>
308+
( runQueryM logParams ctlBackend getSystemStartTime >>=
309+
throwOnLeft
310+
)
311+
pure { pparams, systemStart }
312+
313+
where
314+
throwOnLeft
315+
:: forall a
316+
. Either OgmiosDecodeError a
317+
-> Aff a
318+
throwOnLeft = case _ of
319+
Left err -> throwError $ error $ pprintOgmiosDecodeError err
320+
Right x -> pure x
321+
297322
BlockfrostBackend backend _ ->
298323
runBlockfrostServiceM blockfrostLogger backend $
299324
{ pparams: _, systemStart: _ }
@@ -458,7 +483,8 @@ mkQueryEnv
458483
:: forall (rest :: Row Type). LogParams rest -> CtlBackend -> QueryEnv
459484
mkQueryEnv params ctlBackend =
460485
{ config:
461-
{ kupoConfig: ctlBackend.kupoConfig
486+
{ ogmiosConfig: ctlBackend.ogmios.config
487+
, kupoConfig: ctlBackend.kupoConfig
462488
, logLevel: params.logLevel
463489
, customLogger: params.customLogger
464490
, suppressLogs: params.suppressLogs
@@ -480,3 +506,4 @@ filterLockedUtxos utxos =
480506

481507
withTxRefsCache :: forall (a :: Type). ReaderT UsedTxOuts Aff a -> Contract a
482508
withTxRefsCache = Contract <<< withReaderT _.usedTxOuts
509+

src/Internal/Contract/Provider.purs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ import Ctl.Internal.Contract.LogParams (LogParams)
1616
import Ctl.Internal.Contract.ProviderBackend (BlockfrostBackend, CtlBackend)
1717
import Ctl.Internal.Helpers (logWithLevel)
1818
import Ctl.Internal.QueryM (QueryM)
19-
import Ctl.Internal.QueryM (evaluateTxOgmios, getChainTip, submitTxOgmios) as QueryM
20-
import Ctl.Internal.QueryM.CurrentEpoch (getCurrentEpoch) as QueryM
21-
import Ctl.Internal.QueryM.EraSummaries (getEraSummaries) as QueryM
19+
import Ctl.Internal.QueryM (evaluateTxOgmios) as QueryM
20+
import Ctl.Internal.QueryM.CurrentEpoch (getCurrentEpoch) as OgmiosHttp
21+
import Ctl.Internal.QueryM.EraSummaries (getEraSummaries) as OgmiosHttp
2222
import Ctl.Internal.QueryM.Kupo
2323
( getDatumByHash
2424
, getOutputAddressesByTxHash
@@ -29,11 +29,15 @@ import Ctl.Internal.QueryM.Kupo
2929
, utxosAt
3030
) as Kupo
3131
import Ctl.Internal.QueryM.Ogmios (SubmitTxR(SubmitFail, SubmitTxSuccess))
32+
import Ctl.Internal.QueryM.OgmiosHttp
33+
( getChainTip
34+
, submitTxOgmios
35+
) as OgmiosHttp
3236
import Ctl.Internal.QueryM.Pools
3337
( getPoolIds
3438
, getPubKeyHashDelegationsAndRewards
3539
, getValidatorHashDelegationsAndRewards
36-
) as QueryM
40+
) as OgmiosHttp
3741
import Ctl.Internal.Service.Blockfrost
3842
( BlockfrostServiceM
3943
, runBlockfrostServiceM
@@ -59,13 +63,13 @@ providerForCtlBackend runQueryM params backend =
5963
, doesTxExist: runQueryM' <<< map (map isJust) <<< Kupo.isTxConfirmed
6064
, getTxAuxiliaryData: runQueryM' <<< Kupo.getTxAuxiliaryData
6165
, utxosAt: runQueryM' <<< Kupo.utxosAt
62-
, getChainTip: Right <$> runQueryM' QueryM.getChainTip
63-
, getCurrentEpoch: unwrap <$> runQueryM' QueryM.getCurrentEpoch
66+
, getChainTip: Right <$> runQueryM' OgmiosHttp.getChainTip
67+
, getCurrentEpoch: unwrap <$> runQueryM' OgmiosHttp.getCurrentEpoch
6468
, submitTx: \tx -> runQueryM' do
6569
let txHash = Transaction.hash tx
6670
logDebug' $ "Pre-calculated tx hash: " <> show txHash
6771
let txCborBytes = encodeCbor tx
68-
result <- QueryM.submitTxOgmios txHash txCborBytes
72+
result <- OgmiosHttp.submitTxOgmios txHash txCborBytes
6973
pure $ case result of
7074
SubmitTxSuccess th -> do
7175
if th == txHash then Right th
@@ -74,17 +78,18 @@ providerForCtlBackend runQueryM params backend =
7478
"Computed TransactionHash is not equal to the one returned by Ogmios, please report as bug!"
7579
)
7680
SubmitFail err -> Left $ ClientOtherError $ show err
77-
, evaluateTx: \tx additionalUtxos -> unwrap <$> runQueryM' do
78-
let txBytes = encodeCbor tx
79-
QueryM.evaluateTxOgmios txBytes (wrap additionalUtxos)
80-
, getEraSummaries: Right <$> runQueryM' QueryM.getEraSummaries
81-
, getPoolIds: Right <$> runQueryM' QueryM.getPoolIds
81+
, evaluateTx: \tx additionalUtxos -> unwrap <$>
82+
runQueryM' do
83+
let txBytes = encodeCbor tx
84+
QueryM.evaluateTxOgmios txBytes (wrap additionalUtxos)
85+
, getEraSummaries: Right <$> runQueryM' OgmiosHttp.getEraSummaries
86+
, getPoolIds: Right <$> runQueryM' OgmiosHttp.getPoolIds
8287
, getPubKeyHashDelegationsAndRewards: \_ pubKeyHash ->
8388
Right <$> runQueryM'
84-
(QueryM.getPubKeyHashDelegationsAndRewards pubKeyHash)
89+
(OgmiosHttp.getPubKeyHashDelegationsAndRewards pubKeyHash)
8590
, getValidatorHashDelegationsAndRewards: \_ validatorHash ->
8691
Right <$> runQueryM'
87-
(QueryM.getValidatorHashDelegationsAndRewards $ wrap validatorHash)
92+
(OgmiosHttp.getValidatorHashDelegationsAndRewards $ wrap validatorHash)
8893
}
8994

9095
where

src/Internal/Contract/ProviderBackend.purs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ module Ctl.Internal.Contract.ProviderBackend
1515

1616
import Prelude
1717

18-
import Ctl.Internal.QueryM (OgmiosWebSocket)
18+
import Ctl.Internal.QueryM.OgmiosWebsocket.Types (OgmiosWebSocket)
1919
import Ctl.Internal.ServerConfig (ServerConfig)
2020
import Data.Maybe (Maybe(Just, Nothing))
2121
import Data.Time.Duration (Seconds(Seconds))

0 commit comments

Comments
 (0)