Skip to content

Commit 50bfdbe

Browse files
committed
troupe: add after in favor of receiveTimeout
This patch simplifies the `receive*` machinery, introducing an `after` `Match` which uses STM's `registerDelay` to implement a `receive` with timeouts, instead of relying on `CQueue`s `Timeout` and `NonBlocking` implementations. This should ease reworking `CQueue`, since there should be no need for a distinction between blocking, timeout and non-blocking behaviour in its implementation. Hence, from now on, `troupe` only uses `CQueue`s `Blocking` `dequeue` method. See: #12 See: #11
1 parent fbd351e commit 50bfdbe

File tree

4 files changed

+169
-66
lines changed

4 files changed

+169
-66
lines changed

troupe/src/Troupe.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ module Troupe
2424
send,
2525
sendLazy,
2626
receive,
27-
receiveTimeout,
2827
expect,
2928
Match,
3029
match,
3130
matchIf,
31+
after,
3232

3333
-- ** Linking and monitoring processes
3434
link,
@@ -101,6 +101,7 @@ import Troupe.Process
101101
SpawnOptions (..),
102102
ThreadAffinity (..),
103103
WithMonitor (..),
104+
after,
104105
demonitor,
105106
exit,
106107
getProcessOption,
@@ -111,7 +112,6 @@ import Troupe.Process
111112
newNodeContext,
112113
newProcessContext,
113114
receive,
114-
receiveTimeout,
115115
runProcess,
116116
self,
117117
send,

troupe/src/Troupe/Process.hs

Lines changed: 68 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ module Troupe.Process
3939
send,
4040
sendLazy,
4141
receive,
42-
receiveTimeout,
4342
Match,
4443
matchIf,
44+
after,
4545
)
4646
where
4747

@@ -69,6 +69,7 @@ import Control.Concurrent.STM
6969
readTMVar,
7070
readTQueue,
7171
readTVar,
72+
registerDelay,
7273
throwSTM,
7374
tryReadTMVar,
7475
writeTMVar,
@@ -79,7 +80,7 @@ import Control.DeepSeq (NFData, deepseq, ($!!))
7980
import Control.Distributed.Process.Internal.CQueue
8081
( BlockSpec (..),
8182
CQueue,
82-
MatchOn (MatchMsg),
83+
MatchOn (..),
8384
dequeue,
8485
enqueueSTM,
8586
newCQueue,
@@ -99,7 +100,7 @@ import Control.Exception.Safe
99100
uninterruptibleMask_,
100101
withException,
101102
)
102-
import Control.Monad (MonadPlus, unless, void, when)
103+
import Control.Monad (MonadPlus, forM, unless, when)
103104
import Control.Monad.Error.Class (MonadError)
104105
import Control.Monad.Fix (MonadFix)
105106
import Control.Monad.IO.Class (MonadIO, liftIO)
@@ -126,7 +127,6 @@ import qualified Control.Monad.Trans.Writer.CPS as CPS (WriterT)
126127
import qualified Control.Monad.Trans.Writer.Lazy as Lazy (WriterT)
127128
import qualified Control.Monad.Trans.Writer.Strict as Strict (WriterT)
128129
import Data.Dynamic (Dynamic, fromDynamic, toDyn)
129-
import Data.Functor.Identity (Identity (..), runIdentity)
130130
import Data.Maybe (isJust)
131131
import Data.Typeable (Typeable)
132132
import DeferredFolds.UnfoldlM (forM_)
@@ -488,11 +488,10 @@ demonitor :: (MonadProcess r m, MonadIO m) => [DemonitorOption] -> MonitorRef ->
488488
demonitor !options !ref = do
489489
liftMonadProcess id $ demonitorSTM ref
490490
when (DemonitorFlush `elem` options) $ do
491-
void $
492-
receiveTimeout
493-
0
494-
[ matchIf (\d -> downMonitorRef d == ref) (\_ -> pure ())
495-
]
491+
receive
492+
[ matchIf (\d -> downMonitorRef d == ref) (\_ -> pure ()),
493+
after 0 (pure ())
494+
]
496495
{-# SPECIALIZE demonitor :: [DemonitorOption] -> MonitorRef -> Process r () #-}
497496

498497
exitSTM :: (Exception e) => ProcessId -> Maybe e -> ReaderT (ProcessEnv r) STM ()
@@ -631,85 +630,95 @@ sendLazy = sendWithOptions SendOptions
631630
{-# INLINE sendLazy #-}
632631
{-# SPECIALIZE sendLazy :: (Typeable a) => ProcessId -> a -> Process r () #-}
633632

634-
data ReceiveMethod f where
635-
ReceiveBlocking :: ReceiveMethod Identity
636-
ReceiveNonBlocking :: ReceiveMethod Maybe
637-
ReceiveTimeout :: Int -> ReceiveMethod Maybe
638-
639-
{- HLINT ignore ReceiveOptions "Use newtype instead of data" -}
640-
data ReceiveOptions f = ReceiveOptions
641-
{ receiveOptionsMethod :: !(ReceiveMethod f)
642-
}
633+
data ReceiveOptions = ReceiveOptions
643634

644635
-- | Matching clause for a value of type @a@ in monadic context @m@.
645-
newtype Match m a
636+
data Match m a
646637
= MatchMessage (Dynamic -> Maybe (m a))
638+
| MatchAfter Int (m a)
647639
deriving (Functor)
648640

649-
receiveWithOptions :: (MonadProcess r m, MonadIO m) => ReceiveOptions f -> [Match m a] -> m (f a)
650-
receiveWithOptions !options !matches = do
641+
receiveWithOptions :: (MonadProcess r m, MonadIO m) => ReceiveOptions -> [Match m a] -> m a
642+
receiveWithOptions ReceiveOptions !matches = do
651643
queue <- processContextQueue . processEnvProcessContext <$> getProcessEnv
652-
let bs = case receiveOptionsMethod options of
653-
ReceiveBlocking -> Blocking
654-
ReceiveNonBlocking -> NonBlocking
655-
ReceiveTimeout t -> Timeout t
656-
matches' = map (\(MatchMessage fn) -> MatchMsg fn) matches
657-
p <- liftIO $ dequeue queue bs matches'
644+
645+
p <- liftIO $ do
646+
matches' <- forM matches $ \case
647+
MatchMessage fn -> pure (MatchMsg fn)
648+
MatchAfter t ma -> case t of
649+
0 -> pure $ MatchChan $ pure ma
650+
t' -> do
651+
tv <- registerDelay t'
652+
pure $ MatchChan $ do
653+
v <- readTVar tv
654+
check v
655+
pure ma
656+
657+
dequeue queue Blocking matches'
658658

659659
ensureSignalsDelivered
660660

661661
case p of
662-
Nothing -> case receiveOptionsMethod options of
663-
ReceiveBlocking -> error "receiveWithOptions: dequeue returned Nothing in Blocking call"
664-
ReceiveNonBlocking -> pure Nothing
665-
ReceiveTimeout _ -> pure Nothing
666-
Just a -> case receiveOptionsMethod options of
667-
ReceiveBlocking -> Identity <$> a
668-
ReceiveNonBlocking -> Just <$> a
669-
ReceiveTimeout _ -> Just <$> a
662+
Nothing -> error "receiveWithOptions: dequeue returned Nothing"
663+
Just ma -> ma
670664
where
671665
ensureSignalsDelivered = do
672666
exceptions <- processContextExceptions . processEnvProcessContext <$> getProcessEnv
673667
liftIO $ atomically $ do
674668
e <- isEmptyTQueue exceptions
675669
check e
676-
{-# SPECIALIZE receiveWithOptions :: ReceiveOptions f -> [Match (Process r) a] -> Process r (f a) #-}
670+
{-# SPECIALIZE receiveWithOptions :: ReceiveOptions -> [Match (Process r) a] -> Process r a #-}
677671

678672
-- | Receive some message from the process mailbox, blocking.
679673
receive :: (MonadProcess r m, MonadIO m) => [Match m a] -> m a
680-
receive !matches = runIdentity <$> receiveWithOptions options matches
681-
where
682-
options =
683-
ReceiveOptions
684-
{ receiveOptionsMethod = ReceiveBlocking
685-
}
674+
receive !matches = receiveWithOptions ReceiveOptions matches
686675
{-# INLINE receive #-}
687676
{-# SPECIALIZE receive :: [Match (Process r) a] -> Process r a #-}
688677

689-
-- | Receive some message from the process mailbox.
690-
--
691-
-- If the given timeout is @0@, this works in a non-blocking way. Otherwise,
692-
-- the call will time out after the given number of microseconds.
693-
--
694-
-- If no message is matched within the timeout period, 'Nothing' is returned,
695-
-- otherwise @'Just' a@.
696-
receiveTimeout :: (MonadProcess r m, MonadIO m) => Int -> [Match m a] -> m (Maybe a)
697-
receiveTimeout !t = receiveWithOptions options
698-
where
699-
options =
700-
ReceiveOptions
701-
{ receiveOptionsMethod = if t == 0 then ReceiveNonBlocking else ReceiveTimeout t
702-
}
703-
{-# INLINE receiveTimeout #-}
704-
{-# SPECIALIZE receiveTimeout :: Int -> [Match (Process r) a] -> Process r (Maybe a) #-}
705-
706678
-- | Match any message meeting some predicate of a specific type.
707679
matchIf :: (Typeable a) => (a -> Bool) -> (a -> m b) -> Match m b
708680
matchIf predicate handle = MatchMessage $ \dyn -> case fromDynamic dyn of
709681
Nothing -> Nothing
710682
Just a -> if predicate a then Just (handle a) else Nothing
711683
{-# INLINE matchIf #-}
712684

685+
-- | A 'Match' which doesn't receive any messages, but fires after a given
686+
-- amount of time.
687+
--
688+
-- Instead of looking for a message in the process' mailbox, an 'after' clause
689+
-- in a call to 'receive' will fire after a given number of microseconds,
690+
-- yielding the provided monadic value. This can be used to implement receiving
691+
-- messages with a timeout.
692+
--
693+
-- When the given timeout is @0@, the 'receive' call will be non-blocking.
694+
-- Note, however, the order of matches is important, so
695+
--
696+
-- @
697+
-- s <- self
698+
-- send s ()
699+
-- receive [after 0 (pure "timeout"), match (\() -> pure "message")]
700+
-- @
701+
--
702+
-- will always return @"timeout"@, whilst
703+
--
704+
-- @
705+
-- s <- self
706+
-- send s ()
707+
-- receive [match (\() -> pure "message"), after 0 (pure "timeout")]
708+
-- @
709+
--
710+
-- will always return @"message"@.
711+
--
712+
-- In general, @'after'@ should be the last 'Match' passed to 'receive'.
713+
after ::
714+
-- | Timeout in microseconds. Use @0@ for a non-blocking 'receive'.
715+
Int ->
716+
-- | Action to call when the timeout expired.
717+
m a ->
718+
Match m a
719+
after = MatchAfter
720+
{-# INLINE after #-}
721+
713722
spawnImpl :: (MonadProcess r m, MonadIO m) => ThreadAffinity -> (ProcessId -> ReaderT (ProcessEnv r) STM t) -> Process r a -> m t
714723
spawnImpl affinity cb process = do
715724
currentEnv <- getProcessEnv

troupe/test/Troupe/Test.hs

Lines changed: 98 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ import Control.Monad (forever)
3434
import Control.Monad.IO.Class (liftIO)
3535
import Data.Typeable (Typeable)
3636
import GHC.Generics (Generic)
37+
import System.Clock (Clock (Monotonic), diffTimeSpec, getTime)
3738
import Test.Tasty (TestTree, testGroup)
38-
import Test.Tasty.HUnit (Assertion, assertFailure, testCase, (@?=))
39+
import Test.Tasty.HUnit (Assertion, assertBool, assertFailure, testCase, (@?=))
3940
import Troupe
4041
( DemonitorOption (..),
4142
Down (..),
@@ -48,6 +49,7 @@ import Troupe
4849
SpawnOptions (..),
4950
ThreadAffinity (..),
5051
WithMonitor (..),
52+
after,
5153
demonitor,
5254
exit,
5355
expect,
@@ -58,7 +60,6 @@ import Troupe
5860
matchIf,
5961
monitor,
6062
receive,
61-
receiveTimeout,
6263
runNode,
6364
self,
6465
send,
@@ -183,7 +184,7 @@ tests =
183184
Nothing -> assertFailure $ "Expected downReason to be a TestException: " <> show dr
184185
Just TestException -> pure ()
185186

186-
receiveTimeout 0 [match pure] >>= \case
187+
receive [match (fmap Just), after 0 (pure Nothing)] >>= \case
187188
Nothing -> pure ()
188189
Just Down {} -> liftIO $ assertFailure "unexpected Down message",
189190
testGroup
@@ -263,7 +264,7 @@ tests =
263264
demonitor [] ref
264265
liftIO $ putMVar m ()
265266
Exit _ _ _ Nothing <- expect
266-
receiveTimeout 0 [matchMonitor ref] >>= \res -> liftIO $ do
267+
receive [Just <$> matchMonitor ref, after 0 (pure Nothing)] >>= \res -> liftIO $ do
267268
res @?= Nothing,
268269
testCase "DemonitorFlush" $ troupeTest () $ do
269270
m <- liftIO newEmptyMVar
@@ -278,7 +279,7 @@ tests =
278279

279280
receive [matchMonitor ref]
280281
demonitor [DemonitorFlush] ref2
281-
receiveTimeout 0 [matchMonitor ref2] >>= \res -> liftIO $ do
282+
receive [Just <$> matchMonitor ref2, after 0 (pure Nothing)] >>= \res -> liftIO $ do
282283
res @?= Nothing
283284
],
284285
testGroup
@@ -487,6 +488,98 @@ tests =
487488

488489
a' <- isProcessAlive pid
489490
liftIO $ a' @?= False,
491+
testGroup
492+
"receive"
493+
[ testCase "after 1000" $ troupeTest () $ do
494+
pid <- spawnLink $ do
495+
start <- liftIO $ getTime Monotonic
496+
r <-
497+
receive
498+
[ match (\() -> pure "message"),
499+
after 1000 (pure "after"),
500+
after 1000000 (pure "long after")
501+
]
502+
end <- liftIO $ getTime Monotonic
503+
504+
liftIO $ do
505+
r @?= "after"
506+
507+
let nsPerUs = 1000
508+
minDelay = 1000 * nsPerUs
509+
maxDelay = (1000000 `div` 2) * nsPerUs
510+
diff = diffTimeSpec end start
511+
512+
assertBool
513+
("Unexpectedly short delay: " <> show diff)
514+
(diff >= minDelay)
515+
516+
assertBool
517+
("Unexpectedly long delay: " <> show diff)
518+
(diff < maxDelay)
519+
520+
ref <- monitor pid
521+
awaitProcessExit ref,
522+
testCase "after 1000, with message" $ troupeTest () $ do
523+
pid <- spawnLink $ do
524+
start <- liftIO $ getTime Monotonic
525+
r <-
526+
receive
527+
[ match (\() -> pure "message"),
528+
after 100000000 (pure "after")
529+
]
530+
end <- liftIO $ getTime Monotonic
531+
532+
liftIO $ do
533+
r @?= "message"
534+
535+
let diff = diffTimeSpec end start
536+
maxDelay = 100000000 * 1000
537+
assertBool ("Unexpectedly long delay: " <> show diff) (diff < maxDelay)
538+
539+
ref <- monitor pid
540+
send pid ()
541+
awaitProcessExit ref,
542+
testGroup
543+
"after 0"
544+
[ testCase "No message" $ troupeTest () $ do
545+
pid <- spawnLink $ do
546+
r <-
547+
receive
548+
[ match (\() -> pure "message"),
549+
after 0 (pure "after")
550+
]
551+
liftIO $ r @?= "after"
552+
553+
ref <- monitor pid
554+
awaitProcessExit ref,
555+
testCase "Message" $ troupeTest () $ do
556+
pid <- spawnLink $ do
557+
s <- self
558+
send s ()
559+
r <-
560+
receive
561+
[ match (\() -> pure "message"),
562+
after 0 (pure "after")
563+
]
564+
liftIO $ r @?= "message"
565+
566+
ref <- monitor pid
567+
awaitProcessExit ref,
568+
testCase "ordering retained: after before match" $ troupeTest () $ do
569+
pid <- spawnLink $ do
570+
s <- self
571+
send s ()
572+
r <-
573+
receive
574+
[ after 0 (pure "after"),
575+
match (\() -> pure "message")
576+
]
577+
liftIO $ r @?= "after"
578+
579+
ref <- monitor pid
580+
awaitProcessExit ref
581+
]
582+
],
490583
testGroup
491584
"Non-regression"
492585
[ testCase "Deliver signals before/when receiving messages (#25)" $ troupeTest () $ do

troupe/troupe.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ test-suite troupe-test
9696
ghc-options: -rtsopts -threaded -with-rtsopts=-N2
9797
build-depends:
9898
, base ^>=4.17.0.0 || ^>=4.18.0.0
99+
, clock ^>=0.8.3
99100
, deepseq ^>=1.4.8.0
100101
, safe-exceptions ^>=0.1.7.3
101102
, tasty ^>=1.4.3

0 commit comments

Comments
 (0)