Skip to content

Use Primops-based atomic counter and distribution implementation #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Jun 24, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0433c5b
Bump base constraint.
TravisWhitaker Mar 6, 2020
420f559
Prototype atomic-memory-safe implementation.
TravisWhitaker Jul 19, 2020
2ac505c
Dependency version bumps.
TravisWhitaker Dec 24, 2023
16954da
Merge branch 'bumps' of github.com:TravisWhitaker/ekg-core into bumps
TravisWhitaker Dec 24, 2023
a52ddad
Merge branch 'bumps' of github.com:TravisWhitaker/ekg-core into memor…
TravisWhitaker Dec 24, 2023
74f2239
Update .gitignore
TravisWhitaker Dec 24, 2023
12ab6a7
Fix CPP
TravisWhitaker Dec 24, 2023
a4b7ff9
Merge branch 'master' of github.com:haskell-github-trust/ekg-core int…
TravisWhitaker Jun 8, 2025
53d0986
Clean up a bit.
TravisWhitaker Jun 8, 2025
d76fd8f
Fix ghc-prim constraint
TravisWhitaker Jun 8, 2025
db37322
Fix with 8.0.x
TravisWhitaker Jun 8, 2025
56ca464
fix it harder
TravisWhitaker Jun 8, 2025
0fc6787
fix it harder
TravisWhitaker Jun 8, 2025
316a206
fix it harder
TravisWhitaker Jun 8, 2025
0653030
Fast way on 64-bit, slow way on 32-bit.
TravisWhitaker Jun 19, 2025
5d6ad40
Remove some unnecessary changes, preserve some of the old explanatory…
TravisWhitaker Jun 19, 2025
e5439d2
Make it work on wasm32-wasi and older GHCs
TravisWhitaker Jun 19, 2025
a12f5bd
Int64# was added in GHC 9.4.x
TravisWhitaker Jun 19, 2025
3d3fc98
Make it build on WASM again.
TravisWhitaker Jun 19, 2025
90391c4
Clean up once more
TravisWhitaker Jun 19, 2025
2921d8c
Don't have to destructure boxed constants everywhere.
TravisWhitaker Jun 19, 2025
a89e220
yield in spinLock unhappy path
TravisWhitaker Jun 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 41 additions & 31 deletions Data/Atomic.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
{-# LANGUAGE BangPatterns, ForeignFunctionInterface #-}
{-# LANGUAGE BangPatterns
, CPP
, ForeignFunctionInterface
, MagicHash
, UnboxedTuples
#-}
-- | An atomic integer value. All operations are thread safe.
module Data.Atomic
(
Expand All @@ -12,33 +17,41 @@ module Data.Atomic
, subtract
) where

import Data.Int (Int64)
import Foreign.ForeignPtr (ForeignPtr, mallocForeignPtr, withForeignPtr)
import Foreign.Ptr (Ptr)
import Foreign.Storable (poke)
import Prelude hiding (read, subtract)

import GHC.Int
import GHC.IO
import GHC.Prim

#include "MachDeps.h"

#if WORD_SIZE_IN_BYTES > 32
#define ARRLEN 8
#else
#define ARRLEN 4
#endif

-- | A mutable, atomic integer.
newtype Atomic = C (ForeignPtr Int64)
--newtype Atomic = C (ForeignPtr Int64)
data Atomic = C (MutableByteArray# RealWorld)

-- | Create a new, zero initialized, atomic.
new :: Int64 -> IO Atomic
new n = do
fp <- mallocForeignPtr
withForeignPtr fp $ \ p -> poke p n
return $ C fp
new :: Int -> IO Atomic
new (I# n) = IO $ \s ->
case newByteArray# ARRLEN# s of { (# s1, mba #) ->
case atomicWriteIntArray# mba 0# n s1 of { s2 ->
(# s2, C mba #) }}

read :: Atomic -> IO Int64
read (C fp) = withForeignPtr fp cRead

foreign import ccall unsafe "hs_atomic_read" cRead :: Ptr Int64 -> IO Int64
read :: Atomic -> IO Int
read (C mba) = IO $ \s ->
case atomicReadIntArray# mba 0# s of { (# s1, n #) ->
(# s1, I# n #)}

-- | Set the atomic to the given value.
write :: Atomic -> Int64 -> IO ()
write (C fp) n = withForeignPtr fp $ \ p -> cWrite p n

foreign import ccall unsafe "hs_atomic_write" cWrite
:: Ptr Int64 -> Int64 -> IO ()
write :: Atomic -> Int -> IO ()
write (C mba) (I# n) = IO $ \s ->
case atomicWriteIntArray# mba 0# n s of { s1 ->
(# s1, () #) }

-- | Increase the atomic by one.
inc :: Atomic -> IO ()
Expand All @@ -49,16 +62,13 @@ dec :: Atomic -> IO ()
dec atomic = subtract atomic 1

-- | Increase the atomic by the given amount.
add :: Atomic -> Int64 -> IO ()
add (C fp) n = withForeignPtr fp $ \ p -> cAdd p n
add :: Atomic -> Int -> IO ()
add (C mba) (I# n) = IO $ \s ->
case fetchAddIntArray# mba 0# n s of { (# s1, _ #) ->
(# s1, () #) }

-- | Decrease the atomic by the given amount.
subtract :: Atomic -> Int64 -> IO ()
subtract (C fp) n = withForeignPtr fp $ \ p -> cSubtract p n

-- | Increase the atomic by the given amount.
foreign import ccall unsafe "hs_atomic_add" cAdd :: Ptr Int64 -> Int64 -> IO ()

-- | Increase the atomic by the given amount.
foreign import ccall unsafe "hs_atomic_subtract" cSubtract
:: Ptr Int64 -> Int64 -> IO ()
subtract :: Atomic -> Int -> IO ()
subtract (C mba) (I# n) = IO $ \s ->
case fetchSubIntArray# mba 0# n s of { (# s1, _ #) ->
(# s1, () #) }
41 changes: 20 additions & 21 deletions System/Metrics.hs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ module System.Metrics

import Control.Applicative ((<$>))
import Control.Monad (forM)
import Data.Int (Int64)
import qualified Data.IntMap.Strict as IM
import Data.IORef (IORef, atomicModifyIORef, newIORef, readIORef)
import qualified Data.HashMap.Strict as M
Expand Down Expand Up @@ -133,8 +132,8 @@ data GroupSampler = forall a. GroupSampler
}

-- TODO: Rename this to Metric and Metric to SampledMetric.
data MetricSampler = CounterS !(IO Int64)
| GaugeS !(IO Int64)
data MetricSampler = CounterS !(IO Int)
| GaugeS !(IO Int)
| LabelS !(IO T.Text)
| DistributionS !(IO Distribution.Stats)

Expand All @@ -156,18 +155,18 @@ newStore = do
-- | Register a non-negative, monotonically increasing, integer-valued
-- metric. The provided action to read the value must be thread-safe.
-- Also see 'createCounter'.
registerCounter :: T.Text -- ^ Counter name
-> IO Int64 -- ^ Action to read the current metric value
-> Store -- ^ Metric store
registerCounter :: T.Text -- ^ Counter name
-> IO Int -- ^ Action to read the current metric value
-> Store -- ^ Metric store
-> IO ()
registerCounter name sample store =
register name (CounterS sample) store

-- | Register an integer-valued metric. The provided action to read
-- the value must be thread-safe. Also see 'createGauge'.
registerGauge :: T.Text -- ^ Gauge name
-> IO Int64 -- ^ Action to read the current metric value
-> Store -- ^ Metric store
registerGauge :: T.Text -- ^ Gauge name
-> IO Int -- ^ Action to read the current metric value
-> Store -- ^ Metric store
-> IO ()
registerGauge name sample store =
register name (GaugeS sample) store
Expand Down Expand Up @@ -333,11 +332,11 @@ createDistribution name store = do

#if MIN_VERSION_base(4,10,0)
-- | Convert nanoseconds to milliseconds.
nsToMs :: Int64 -> Int64
nsToMs :: Int -> Int
nsToMs s = round (realToFrac s / (1000000.0 :: Double))
#else
-- | Convert seconds to milliseconds.
sToMs :: Double -> Int64
sToMs :: Double -> Int
sToMs s = round (s * 1000.0)
#endif

Expand Down Expand Up @@ -430,15 +429,15 @@ registerGcMetrics store =
, ("rts.gc.cumulative_bytes_used" , Counter . fromIntegral . Stats.cumulative_live_bytes)
, ("rts.gc.bytes_copied" , Counter . fromIntegral . Stats.copied_bytes)
#if MIN_VERSION_base(4,12,0)
, ("rts.gc.init_cpu_ms" , Counter . nsToMs . Stats.init_cpu_ns)
, ("rts.gc.init_wall_ms" , Counter . nsToMs . Stats.init_elapsed_ns)
, ("rts.gc.init_cpu_ms" , Counter . nsToMs . fromIntegral . Stats.init_cpu_ns)
, ("rts.gc.init_wall_ms" , Counter . nsToMs . fromIntegral . Stats.init_elapsed_ns)
#endif
, ("rts.gc.mutator_cpu_ms" , Counter . nsToMs . Stats.mutator_cpu_ns)
, ("rts.gc.mutator_wall_ms" , Counter . nsToMs . Stats.mutator_elapsed_ns)
, ("rts.gc.gc_cpu_ms" , Counter . nsToMs . Stats.gc_cpu_ns)
, ("rts.gc.gc_wall_ms" , Counter . nsToMs . Stats.gc_elapsed_ns)
, ("rts.gc.cpu_ms" , Counter . nsToMs . Stats.cpu_ns)
, ("rts.gc.wall_ms" , Counter . nsToMs . Stats.elapsed_ns)
, ("rts.gc.mutator_cpu_ms" , Counter . nsToMs . fromIntegral . Stats.mutator_cpu_ns)
, ("rts.gc.mutator_wall_ms" , Counter . nsToMs . fromIntegral . Stats.mutator_elapsed_ns)
, ("rts.gc.gc_cpu_ms" , Counter . nsToMs . fromIntegral . Stats.gc_cpu_ns)
, ("rts.gc.gc_wall_ms" , Counter . nsToMs . fromIntegral . Stats.gc_elapsed_ns)
, ("rts.gc.cpu_ms" , Counter . nsToMs . fromIntegral . Stats.cpu_ns)
, ("rts.gc.wall_ms" , Counter . nsToMs . fromIntegral . Stats.elapsed_ns)
, ("rts.gc.max_bytes_used" , Gauge . fromIntegral . Stats.max_live_bytes)
, ("rts.gc.current_bytes_used" , Gauge . fromIntegral . Stats.gcdetails_live_bytes . Stats.gc)
, ("rts.gc.current_bytes_slop" , Gauge . fromIntegral . Stats.gcdetails_slop_bytes . Stats.gc)
Expand Down Expand Up @@ -615,8 +614,8 @@ sampleGroups cbSamplers = concat `fmap` sequence (map runOne cbSamplers)
return $! map (\ (n, f) -> (n, f a)) (M.toList groupSamplerMetrics)

-- | The value of a sampled metric.
data Value = Counter {-# UNPACK #-} !Int64
| Gauge {-# UNPACK #-} !Int64
data Value = Counter {-# UNPACK #-} !Int
| Gauge {-# UNPACK #-} !Int
| Label {-# UNPACK #-} !T.Text
| Distribution !Distribution.Stats
deriving (Eq, Show)
Expand Down
5 changes: 2 additions & 3 deletions System/Metrics/Counter.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ module System.Metrics.Counter
) where

import qualified Data.Atomic as Atomic
import Data.Int (Int64)
import Prelude hiding (read)

-- | A mutable, integer-valued counter.
Expand All @@ -23,13 +22,13 @@ new :: IO Counter
new = C `fmap` Atomic.new 0

-- | Get the current value of the counter.
read :: Counter -> IO Int64
read :: Counter -> IO Int
read = Atomic.read . unC

-- | Increase the counter by one.
inc :: Counter -> IO ()
inc counter = add counter 1

-- | Add the argument to the counter.
add :: Counter -> Int64 -> IO ()
add :: Counter -> Int -> IO ()
add counter = Atomic.add (unC counter)
Loading