From d30ddbd54a79b036e42a840878d0934651c16597 Mon Sep 17 00:00:00 2001 From: Nicolas Trangez Date: Mon, 27 Mar 2023 11:10:10 +0200 Subject: [PATCH 1/8] Stash --- troupe/src/Troupe/Queue.hs | 34 ++++++++++++++++++++++++++++++++++ troupe/troupe.cabal | 2 ++ 2 files changed, 36 insertions(+) create mode 100644 troupe/src/Troupe/Queue.hs diff --git a/troupe/src/Troupe/Queue.hs b/troupe/src/Troupe/Queue.hs new file mode 100644 index 0000000..3d19d31 --- /dev/null +++ b/troupe/src/Troupe/Queue.hs @@ -0,0 +1,34 @@ +{-# LANGUAGE DeriveFunctor #-} + +module Troupe.Queue (newQueue, enqueue, dequeue) where + +import Control.Concurrent.Classy.STM (MonadSTM, TQueue, TVar, flushTQueue, modifyTVar', newTQueue, newTVar, writeTQueue) +import Control.Monad.Conc.Class (MonadConc, STM, atomically) + +data Queue stm a = Queue + { queueTQueue :: {-# UNPACK #-} !(TQueue stm a), + queueMessages :: {-# UNPACK #-} !(TVar stm [a]) + } + +newQueue :: (MonadSTM stm) => stm (Queue stm a) +newQueue = + Queue + <$> newTQueue + <*> newTVar [] + +enqueue :: (MonadSTM stm) => Queue stm a -> a -> stm () +enqueue queue a = writeTQueue (queueTQueue queue) a + +data Match stm m a + = MatchMessage (m -> Maybe a) + | MatchSTM (stm a) + deriving (Functor) + +dequeue :: (MonadConc m) => Queue (STM m) a -> [Match (STM m) m' a] -> m (Maybe a) +dequeue queue matches = do + atomically getMessages + atomically handleMatches + where + getMessages = do + newMessages <- flushTQueue (queueTQueue queue) + modifyTVar' (queueMessages queue) (\old -> old ++ newMessages) diff --git a/troupe/troupe.cabal b/troupe/troupe.cabal index 1049d1e..5a9beac 100644 --- a/troupe/troupe.cabal +++ b/troupe/troupe.cabal @@ -45,11 +45,13 @@ library other-modules: Troupe.Exceptions Troupe.Process + Troupe.Queue Troupe.Types build-depends: , async ^>=2.2.4 , base ^>=4.17.0.0 || ^>=4.18.0.0 + , concurrency ^>=1.11.0.2 , deepseq ^>=1.4.8.0 , deferred-folds ^>=0.9.18.3 , hashable ^>=1.4.2.0 From 543fce877dfc8daae2104d13858326f13a981eac Mon Sep 17 00:00:00 2001 From: Nicolas Trangez Date: Mon, 27 Mar 2023 17:08:28 +0200 Subject: [PATCH 2/8] Stash rework of `CQueue` See: https://github.com/NicolasT/troupe/issues/11 --- troupe/src/Troupe/Queue.hs | 74 +++++++++++++++++-- troupe/test/Troupe/Queue/Test.hs | 122 +++++++++++++++++++++++++++++++ troupe/test/troupe-test.hs | 4 +- troupe/troupe.cabal | 14 +++- 4 files changed, 202 insertions(+), 12 deletions(-) create mode 100644 troupe/test/Troupe/Queue/Test.hs diff --git a/troupe/src/Troupe/Queue.hs b/troupe/src/Troupe/Queue.hs index 3d19d31..f5e08c6 100644 --- a/troupe/src/Troupe/Queue.hs +++ b/troupe/src/Troupe/Queue.hs @@ -1,13 +1,37 @@ {-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE LambdaCase #-} -module Troupe.Queue (newQueue, enqueue, dequeue) where +module Troupe.Queue + ( Queue, + newQueue, + enqueue, + Match (..), + dequeue, + ) +where -import Control.Concurrent.Classy.STM (MonadSTM, TQueue, TVar, flushTQueue, modifyTVar', newTQueue, newTVar, writeTQueue) +import Control.Concurrent.Classy.STM + ( MonadSTM, + TQueue, + TVar, + flushTQueue, + modifyTVar', + newTQueue, + newTVar, + orElse, + readTVar, + retry, + tryReadTQueue, + unGetTQueue, + writeTQueue, + writeTVar, + ) +import qualified Control.Concurrent.STM as CCS import Control.Monad.Conc.Class (MonadConc, STM, atomically) data Queue stm a = Queue { queueTQueue :: {-# UNPACK #-} !(TQueue stm a), - queueMessages :: {-# UNPACK #-} !(TVar stm [a]) + queueMessages :: !(TVar stm [a]) } newQueue :: (MonadSTM stm) => stm (Queue stm a) @@ -15,20 +39,54 @@ newQueue = Queue <$> newTQueue <*> newTVar [] +{-# SPECIALIZE newQueue :: CCS.STM (Queue CCS.STM a) #-} enqueue :: (MonadSTM stm) => Queue stm a -> a -> stm () enqueue queue a = writeTQueue (queueTQueue queue) a +{-# INLINE enqueue #-} +{-# SPECIALIZE enqueue :: Queue CCS.STM a -> a -> CCS.STM () #-} -data Match stm m a - = MatchMessage (m -> Maybe a) - | MatchSTM (stm a) +data Match stm a b + = MatchMessage (a -> Maybe b) + | MatchSTM (stm b) deriving (Functor) -dequeue :: (MonadConc m) => Queue (STM m) a -> [Match (STM m) m' a] -> m (Maybe a) +dequeue :: (MonadConc m) => Queue (STM m) a -> [Match (STM m) a b] -> m b dequeue queue matches = do atomically getMessages - atomically handleMatches + atomically $ + handleExistingMessages >>= \case + Nothing -> handleNewMessages + Just b -> pure b where getMessages = do newMessages <- flushTQueue (queueTQueue queue) modifyTVar' (queueMessages queue) (\old -> old ++ newMessages) + handleExistingMessages = do + messages <- readTVar (queueMessages queue) + foldr orElse (pure Nothing) $ flip map matches $ \case + MatchMessage fn -> findMessage fn [] messages + MatchSTM stm -> fmap Just stm + findMessage fn acc = \case + [] -> retry + (x : xs) -> case fn x of + Nothing -> findMessage fn (x : acc) xs + Just a -> do + writeTVar (queueMessages queue) (reverse acc ++ xs) + pure (Just a) + handleNewMessages = foldr orElse (storeMessage >> retry) $ flip map matches $ \case + MatchMessage fn -> + tryReadTQueue (queueTQueue queue) >>= \case + Nothing -> retry + Just msg -> case fn msg of + Nothing -> do + unGetTQueue (queueTQueue queue) msg + retry + Just a -> pure a + MatchSTM stm -> stm + storeMessage = do + mmsg <- tryReadTQueue (queueTQueue queue) + case mmsg of + Nothing -> pure () + Just msg -> modifyTVar' (queueMessages queue) (\old -> old ++ [msg]) +{-# SPECIALIZE dequeue :: Queue CCS.STM a -> [Match CCS.STM a b] -> IO b #-} diff --git a/troupe/test/Troupe/Queue/Test.hs b/troupe/test/Troupe/Queue/Test.hs new file mode 100644 index 0000000..50ec3ef --- /dev/null +++ b/troupe/test/Troupe/Queue/Test.hs @@ -0,0 +1,122 @@ +module Troupe.Queue.Test (tests) where + +import Control.Concurrent.Classy (threadDelay) +import Control.Concurrent.Classy.Async (withAsync) +import Control.Concurrent.Classy.STM (check, readTVar, registerDelay, throwSTM) +import Control.Exception.Safe (Exception, try) +import Control.Monad.Conc.Class (atomically) +import Test.Tasty (TestTree, testGroup) +import Test.Tasty.DejaFu (testAuto) +import Test.Tasty.HUnit (testCase, (@?=)) +import Troupe.Queue (Match (..), dequeue, enqueue, newQueue) + +tests :: TestTree +tests = + testGroup + "Troupe.Queue" + [ testCase "Simple" $ do + q <- atomically newQueue + atomically $ enqueue q () + r <- dequeue q [MatchMessage (\() -> pure True)] + r @?= True, + testCase "Less simple" $ do + q <- atomically newQueue + d <- registerDelay 1000 + r <- dequeue q [MatchSTM (readTVar d >>= check)] + r @?= (), + testCase "Another one" $ do + q <- atomically newQueue + d <- registerDelay 100000 + atomically $ enqueue q () + r <- dequeue q [MatchMessage (\() -> pure True), MatchSTM (do readTVar d >>= check; pure False)] + r @?= True, + testCase "Another one" $ do + q <- atomically newQueue + d <- registerDelay 100000 + atomically $ enqueue q () + r <- dequeue q [MatchSTM (do readTVar d >>= check; pure False), MatchMessage (\() -> pure True)] + r @?= True, + testCase "Old ones" $ do + q <- atomically newQueue + atomically $ enqueue q True + atomically $ enqueue q False + dequeue q [MatchMessage (\b -> if not b then Just () else Nothing)] + dequeue q [MatchMessage (\b -> if b then Just () else Nothing)], + testCase "First one wins" $ do + q <- atomically newQueue + r <- dequeue q [MatchSTM (pure True), MatchSTM (pure False)] + r @?= True, + testCase "first one still wins" $ do + q <- atomically newQueue + atomically $ enqueue q True + r <- + dequeue + q + [ MatchMessage (\b -> if not b then Just (0 :: Int) else Nothing), + MatchMessage (\b -> if b then Just 1 else Nothing), + MatchMessage (\b -> if not b then Just 2 else Nothing), + MatchMessage (\b -> if b then Just 3 else Nothing) + ] + r @?= 1, + testCase "a" $ do + q <- atomically newQueue + atomically $ enqueue q True + r <- dequeue q [MatchSTM (pure False), MatchMessage Just] + r @?= False + r' <- dequeue q [MatchMessage Just, MatchSTM (pure False)] + r' @?= True, + testAuto "What gives..." $ do + q <- atomically newQueue + atomically $ do + enqueue q (1 :: Int) + enqueue q 2 + dequeue + q + [ MatchMessage (\i -> if i == 2 then Just i else Nothing), + MatchMessage (\i -> if i == 3 then Just i else Nothing), + MatchMessage (\i -> if i == 1 then Just i else Nothing) + ], + testAuto "Another" $ do + q <- atomically newQueue + dequeue q [MatchMessage (\() -> pure True), MatchSTM (pure False)], + testAuto "W" $ do + q <- atomically $ do + q <- newQueue + enqueue q () + pure q + dequeue q [MatchMessage (\() -> pure True), MatchSTM (pure False)], + testAuto "MT" $ do + q <- atomically newQueue + withAsync (threadDelay 100000 >> atomically (enqueue q (1 :: Int))) $ \_a -> + withAsync (threadDelay 1000 >> atomically (enqueue q 2)) $ \_b -> do + -- Note, we don't synchronise on `a` or `b`, so the `dequeue` returns + -- either 1 or 2 + r <- + dequeue + q + [ MatchMessage (\i -> if i == 1 then Just i else Nothing), + MatchMessage (\i -> if i == 2 then Just i else Nothing) + ] + pure $ r == 1 || r == 2, + testCase "throw" $ do + q <- atomically newQueue + atomically $ enqueue q (1 :: Int) + + r <- + try $ + dequeue + q + [ MatchMessage (\i -> if i == 0 then Just i else Nothing), + MatchSTM (throwSTM TestException), + MatchMessage Just + ] + r @?= Left TestException + + r' <- dequeue q [MatchMessage Just] + r' @?= 1 + ] + +data TestException = TestException + deriving (Show, Eq) + +instance Exception TestException diff --git a/troupe/test/troupe-test.hs b/troupe/test/troupe-test.hs index 1bd4d60..da12271 100644 --- a/troupe/test/troupe-test.hs +++ b/troupe/test/troupe-test.hs @@ -1,6 +1,7 @@ module Main (main) where import Test.Tasty (defaultMain, testGroup) +import qualified Troupe.Queue.Test as TQ import qualified Troupe.Test as T main :: IO () @@ -8,5 +9,6 @@ main = defaultMain $ testGroup "troupe-test" - [ T.tests + [ T.tests, + TQ.tests ] diff --git a/troupe/troupe.cabal b/troupe/troupe.cabal index 5a9beac..13a67be 100644 --- a/troupe/troupe.cabal +++ b/troupe/troupe.cabal @@ -41,11 +41,13 @@ common warnings library import: warnings - exposed-modules: Troupe + exposed-modules: + Troupe + Troupe.Queue + other-modules: Troupe.Exceptions Troupe.Process - Troupe.Queue Troupe.Types build-depends: @@ -94,13 +96,19 @@ test-suite troupe-test type: exitcode-stdio-1.0 hs-source-dirs: test main-is: troupe-test.hs - other-modules: Troupe.Test + other-modules: + Troupe.Queue.Test + Troupe.Test + ghc-options: -rtsopts -threaded -with-rtsopts=-N2 build-depends: , base ^>=4.17.0.0 || ^>=4.18.0.0 + , concurrency ^>=1.11.0.2 , deepseq ^>=1.4.8.0 , safe-exceptions ^>=0.1.7.3 + , stm ^>=2.5.1.0 , tasty ^>=1.4.3 + , tasty-dejafu ^>=2.1.0.0 , tasty-hunit ^>=0.10.0.3 , transformers ^>=0.5.6.2 || ^>=0.6.1.0 , troupe From 323be8ad7252d1057527e2e658b74831cba28c61 Mon Sep 17 00:00:00 2001 From: Nicolas Trangez Date: Mon, 27 Mar 2023 18:00:02 +0200 Subject: [PATCH 3/8] Stash --- troupe/test/Troupe/Queue/Test.hs | 3 +++ troupe/troupe.cabal | 1 - 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/troupe/test/Troupe/Queue/Test.hs b/troupe/test/Troupe/Queue/Test.hs index 50ec3ef..4a72387 100644 --- a/troupe/test/Troupe/Queue/Test.hs +++ b/troupe/test/Troupe/Queue/Test.hs @@ -79,6 +79,9 @@ tests = testAuto "Another" $ do q <- atomically newQueue dequeue q [MatchMessage (\() -> pure True), MatchSTM (pure False)], + testAuto "F" $ do + q <- atomically newQueue + dequeue q [MatchSTM (pure (1 :: Int)), MatchSTM (pure 2), MatchSTM (pure 3)], testAuto "W" $ do q <- atomically $ do q <- newQueue diff --git a/troupe/troupe.cabal b/troupe/troupe.cabal index 13a67be..1fbb10f 100644 --- a/troupe/troupe.cabal +++ b/troupe/troupe.cabal @@ -106,7 +106,6 @@ test-suite troupe-test , concurrency ^>=1.11.0.2 , deepseq ^>=1.4.8.0 , safe-exceptions ^>=0.1.7.3 - , stm ^>=2.5.1.0 , tasty ^>=1.4.3 , tasty-dejafu ^>=2.1.0.0 , tasty-hunit ^>=0.10.0.3 From 9bf2d760f4bfb25fc156a775e15cbaab453168b5 Mon Sep 17 00:00:00 2001 From: Nicolas Trangez Date: Mon, 27 Mar 2023 18:01:31 +0200 Subject: [PATCH 4/8] Stash --- troupe/src/Troupe/Queue.hs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/troupe/src/Troupe/Queue.hs b/troupe/src/Troupe/Queue.hs index f5e08c6..a3cd3f4 100644 --- a/troupe/src/Troupe/Queue.hs +++ b/troupe/src/Troupe/Queue.hs @@ -42,7 +42,7 @@ newQueue = {-# SPECIALIZE newQueue :: CCS.STM (Queue CCS.STM a) #-} enqueue :: (MonadSTM stm) => Queue stm a -> a -> stm () -enqueue queue a = writeTQueue (queueTQueue queue) a +enqueue queue = writeTQueue (queueTQueue queue) {-# INLINE enqueue #-} {-# SPECIALIZE enqueue :: Queue CCS.STM a -> a -> CCS.STM () #-} @@ -61,7 +61,7 @@ dequeue queue matches = do where getMessages = do newMessages <- flushTQueue (queueTQueue queue) - modifyTVar' (queueMessages queue) (\old -> old ++ newMessages) + modifyTVar' (queueMessages queue) (++ newMessages) handleExistingMessages = do messages <- readTVar (queueMessages queue) foldr orElse (pure Nothing) $ flip map matches $ \case From 327be5015577f0ae3bd4eaa255935898bb3524bb Mon Sep 17 00:00:00 2001 From: Nicolas Trangez Date: Mon, 27 Mar 2023 18:01:55 +0200 Subject: [PATCH 5/8] Stash --- cabal.project | 1 + 1 file changed, 1 insertion(+) diff --git a/cabal.project b/cabal.project index 7ccc2f6..4a8682d 100644 --- a/cabal.project +++ b/cabal.project @@ -4,6 +4,7 @@ Packages: Allow-Newer: focus-1.0.3:transformers, stm-hamt-1.2.0.9:transformers, + concurrency-1.11.0.2:mtl, Constraints: troupe +werror, From 79fd5d172b856c8171f26f28c6c4975060a0d6e1 Mon Sep 17 00:00:00 2001 From: Nicolas Trangez Date: Mon, 27 Mar 2023 18:30:12 +0200 Subject: [PATCH 6/8] Stash --- .github/workflows/haskell-ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/haskell-ci.yml b/.github/workflows/haskell-ci.yml index 3138c25..0864c43 100644 --- a/.github/workflows/haskell-ci.yml +++ b/.github/workflows/haskell-ci.yml @@ -168,6 +168,7 @@ jobs: constraints: troupe +werror allow-newer: focus-1.0.3:transformers allow-newer: stm-hamt-1.2.0.9:transformers + allow-newer: concurrency-1.11.0.2:mtl EOF $HCPKG list --simple-output --names-only | perl -ne 'for (split /\s+/) { print "constraints: $_ installed\n" unless /^(troupe)$/; }' >> cabal.project.local cat cabal.project From c656ccc53293669c8f4d67985974a9696e6d0a8e Mon Sep 17 00:00:00 2001 From: Nicolas Trangez Date: Mon, 27 Mar 2023 18:34:54 +0200 Subject: [PATCH 7/8] Stash --- .github/workflows/haskell-ci.yml | 1 + cabal.project | 1 + 2 files changed, 2 insertions(+) diff --git a/.github/workflows/haskell-ci.yml b/.github/workflows/haskell-ci.yml index 0864c43..43cb1a0 100644 --- a/.github/workflows/haskell-ci.yml +++ b/.github/workflows/haskell-ci.yml @@ -169,6 +169,7 @@ jobs: allow-newer: focus-1.0.3:transformers allow-newer: stm-hamt-1.2.0.9:transformers allow-newer: concurrency-1.11.0.2:mtl + allow-newer: concurrency-1.11.0.2:transformers EOF $HCPKG list --simple-output --names-only | perl -ne 'for (split /\s+/) { print "constraints: $_ installed\n" unless /^(troupe)$/; }' >> cabal.project.local cat cabal.project diff --git a/cabal.project b/cabal.project index 4a8682d..b503d6c 100644 --- a/cabal.project +++ b/cabal.project @@ -5,6 +5,7 @@ Allow-Newer: focus-1.0.3:transformers, stm-hamt-1.2.0.9:transformers, concurrency-1.11.0.2:mtl, + concurrency-1.11.0.2:transformers, Constraints: troupe +werror, From af29756f1ad23c7332724e78627657d0f7845052 Mon Sep 17 00:00:00 2001 From: Nicolas Trangez Date: Mon, 27 Mar 2023 18:45:34 +0200 Subject: [PATCH 8/8] Stash --- .github/workflows/haskell-ci.yml | 1 + cabal.project | 1 + 2 files changed, 2 insertions(+) diff --git a/.github/workflows/haskell-ci.yml b/.github/workflows/haskell-ci.yml index 43cb1a0..caea6f2 100644 --- a/.github/workflows/haskell-ci.yml +++ b/.github/workflows/haskell-ci.yml @@ -170,6 +170,7 @@ jobs: allow-newer: stm-hamt-1.2.0.9:transformers allow-newer: concurrency-1.11.0.2:mtl allow-newer: concurrency-1.11.0.2:transformers + allow-newer: dejafu-2.4.0.4:transformers EOF $HCPKG list --simple-output --names-only | perl -ne 'for (split /\s+/) { print "constraints: $_ installed\n" unless /^(troupe)$/; }' >> cabal.project.local cat cabal.project diff --git a/cabal.project b/cabal.project index b503d6c..d32ef8b 100644 --- a/cabal.project +++ b/cabal.project @@ -6,6 +6,7 @@ Allow-Newer: stm-hamt-1.2.0.9:transformers, concurrency-1.11.0.2:mtl, concurrency-1.11.0.2:transformers, + dejafu-2.4.0.4:transformers, Constraints: troupe +werror,