Skip to content

WIP: Sketch primops-based atomic counter and distribution implementation #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 41 additions & 31 deletions Data/Atomic.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
{-# LANGUAGE BangPatterns, ForeignFunctionInterface #-}
{-# LANGUAGE BangPatterns
, CPP
, ForeignFunctionInterface
, MagicHash
, UnboxedTuples
#-}
-- | An atomic integer value. All operations are thread safe.
module Data.Atomic
(
Expand All @@ -12,33 +17,41 @@ module Data.Atomic
, subtract
) where

import Data.Int (Int64)
import Foreign.ForeignPtr (ForeignPtr, mallocForeignPtr, withForeignPtr)
import Foreign.Ptr (Ptr)
import Foreign.Storable (poke)
import Prelude hiding (read, subtract)

import GHC.Int
import GHC.IO
import GHC.Prim

#include "MachDeps.h"

#if WORD_SIZE_IN_BYTES > 32
#define ARRLEN 8
#else
#define ARRLEN 4
#endif

-- | A mutable, atomic integer.
newtype Atomic = C (ForeignPtr Int64)
--newtype Atomic = C (ForeignPtr Int64)
data Atomic = C (MutableByteArray# RealWorld)

-- | Create a new, zero initialized, atomic.
new :: Int64 -> IO Atomic
new n = do
fp <- mallocForeignPtr
withForeignPtr fp $ \ p -> poke p n
return $ C fp
new :: Int -> IO Atomic
new (I# n) = IO $ \s ->
case newByteArray# ARRLEN# s of { (# s1, mba #) ->
case atomicWriteIntArray# mba 0# n s1 of { s2 ->
(# s2, C mba #) }}

read :: Atomic -> IO Int64
read (C fp) = withForeignPtr fp cRead

foreign import ccall unsafe "hs_atomic_read" cRead :: Ptr Int64 -> IO Int64
read :: Atomic -> IO Int
read (C mba) = IO $ \s ->
case atomicReadIntArray# mba 0# s of { (# s1, n #) ->
(# s1, I# n #)}

-- | Set the atomic to the given value.
write :: Atomic -> Int64 -> IO ()
write (C fp) n = withForeignPtr fp $ \ p -> cWrite p n

foreign import ccall unsafe "hs_atomic_write" cWrite
:: Ptr Int64 -> Int64 -> IO ()
write :: Atomic -> Int -> IO ()
write (C mba) (I# n) = IO $ \s ->
case atomicWriteIntArray# mba 0# n s of { s1 ->
(# s1, () #) }

-- | Increase the atomic by one.
inc :: Atomic -> IO ()
Expand All @@ -49,16 +62,13 @@ dec :: Atomic -> IO ()
dec atomic = subtract atomic 1

-- | Increase the atomic by the given amount.
add :: Atomic -> Int64 -> IO ()
add (C fp) n = withForeignPtr fp $ \ p -> cAdd p n
add :: Atomic -> Int -> IO ()
add (C mba) (I# n) = IO $ \s ->
case fetchAddIntArray# mba 0# n s of { (# s1, _ #) ->
(# s1, () #) }

-- | Decrease the atomic by the given amount.
subtract :: Atomic -> Int64 -> IO ()
subtract (C fp) n = withForeignPtr fp $ \ p -> cSubtract p n

-- | Increase the atomic by the given amount.
foreign import ccall unsafe "hs_atomic_add" cAdd :: Ptr Int64 -> Int64 -> IO ()

-- | Increase the atomic by the given amount.
foreign import ccall unsafe "hs_atomic_subtract" cSubtract
:: Ptr Int64 -> Int64 -> IO ()
subtract :: Atomic -> Int -> IO ()
subtract (C mba) (I# n) = IO $ \s ->
case fetchSubIntArray# mba 0# n s of { (# s1, _ #) ->
(# s1, () #) }
41 changes: 20 additions & 21 deletions System/Metrics.hs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ module System.Metrics

import Control.Applicative ((<$>))
import Control.Monad (forM)
import Data.Int (Int64)
import qualified Data.IntMap.Strict as IM
import Data.IORef (IORef, atomicModifyIORef, newIORef, readIORef)
import qualified Data.HashMap.Strict as M
Expand Down Expand Up @@ -133,8 +132,8 @@ data GroupSampler = forall a. GroupSampler
}

-- TODO: Rename this to Metric and Metric to SampledMetric.
data MetricSampler = CounterS !(IO Int64)
| GaugeS !(IO Int64)
data MetricSampler = CounterS !(IO Int)
| GaugeS !(IO Int)
| LabelS !(IO T.Text)
| DistributionS !(IO Distribution.Stats)

Expand All @@ -156,18 +155,18 @@ newStore = do
-- | Register a non-negative, monotonically increasing, integer-valued
-- metric. The provided action to read the value must be thread-safe.
-- Also see 'createCounter'.
registerCounter :: T.Text -- ^ Counter name
-> IO Int64 -- ^ Action to read the current metric value
-> Store -- ^ Metric store
registerCounter :: T.Text -- ^ Counter name
-> IO Int -- ^ Action to read the current metric value
-> Store -- ^ Metric store
-> IO ()
registerCounter name sample store =
register name (CounterS sample) store

-- | Register an integer-valued metric. The provided action to read
-- the value must be thread-safe. Also see 'createGauge'.
registerGauge :: T.Text -- ^ Gauge name
-> IO Int64 -- ^ Action to read the current metric value
-> Store -- ^ Metric store
registerGauge :: T.Text -- ^ Gauge name
-> IO Int -- ^ Action to read the current metric value
-> Store -- ^ Metric store
-> IO ()
registerGauge name sample store =
register name (GaugeS sample) store
Expand Down Expand Up @@ -333,11 +332,11 @@ createDistribution name store = do

#if MIN_VERSION_base(4,10,0)
-- | Convert nanoseconds to milliseconds.
nsToMs :: Int64 -> Int64
nsToMs :: Int -> Int
nsToMs s = round (realToFrac s / (1000000.0 :: Double))
#else
-- | Convert seconds to milliseconds.
sToMs :: Double -> Int64
sToMs :: Double -> Int
sToMs s = round (s * 1000.0)
#endif

Expand Down Expand Up @@ -430,15 +429,15 @@ registerGcMetrics store =
, ("rts.gc.cumulative_bytes_used" , Counter . fromIntegral . Stats.cumulative_live_bytes)
, ("rts.gc.bytes_copied" , Counter . fromIntegral . Stats.copied_bytes)
#if MIN_VERSION_base(4,12,0)
, ("rts.gc.init_cpu_ms" , Counter . nsToMs . Stats.init_cpu_ns)
, ("rts.gc.init_wall_ms" , Counter . nsToMs . Stats.init_elapsed_ns)
, ("rts.gc.init_cpu_ms" , Counter . nsToMs . fromIntegral . Stats.init_cpu_ns)
, ("rts.gc.init_wall_ms" , Counter . nsToMs . fromIntegral . Stats.init_elapsed_ns)
#endif
, ("rts.gc.mutator_cpu_ms" , Counter . nsToMs . Stats.mutator_cpu_ns)
, ("rts.gc.mutator_wall_ms" , Counter . nsToMs . Stats.mutator_elapsed_ns)
, ("rts.gc.gc_cpu_ms" , Counter . nsToMs . Stats.gc_cpu_ns)
, ("rts.gc.gc_wall_ms" , Counter . nsToMs . Stats.gc_elapsed_ns)
, ("rts.gc.cpu_ms" , Counter . nsToMs . Stats.cpu_ns)
, ("rts.gc.wall_ms" , Counter . nsToMs . Stats.elapsed_ns)
, ("rts.gc.mutator_cpu_ms" , Counter . nsToMs . fromIntegral . Stats.mutator_cpu_ns)
, ("rts.gc.mutator_wall_ms" , Counter . nsToMs . fromIntegral . Stats.mutator_elapsed_ns)
, ("rts.gc.gc_cpu_ms" , Counter . nsToMs . fromIntegral . Stats.gc_cpu_ns)
, ("rts.gc.gc_wall_ms" , Counter . nsToMs . fromIntegral . Stats.gc_elapsed_ns)
, ("rts.gc.cpu_ms" , Counter . nsToMs . fromIntegral . Stats.cpu_ns)
, ("rts.gc.wall_ms" , Counter . nsToMs . fromIntegral . Stats.elapsed_ns)
, ("rts.gc.max_bytes_used" , Gauge . fromIntegral . Stats.max_live_bytes)
, ("rts.gc.current_bytes_used" , Gauge . fromIntegral . Stats.gcdetails_live_bytes . Stats.gc)
, ("rts.gc.current_bytes_slop" , Gauge . fromIntegral . Stats.gcdetails_slop_bytes . Stats.gc)
Expand Down Expand Up @@ -615,8 +614,8 @@ sampleGroups cbSamplers = concat `fmap` sequence (map runOne cbSamplers)
return $! map (\ (n, f) -> (n, f a)) (M.toList groupSamplerMetrics)

-- | The value of a sampled metric.
data Value = Counter {-# UNPACK #-} !Int64
| Gauge {-# UNPACK #-} !Int64
data Value = Counter {-# UNPACK #-} !Int
| Gauge {-# UNPACK #-} !Int
| Label {-# UNPACK #-} !T.Text
| Distribution !Distribution.Stats
deriving (Eq, Show)
Expand Down
5 changes: 2 additions & 3 deletions System/Metrics/Counter.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ module System.Metrics.Counter
) where

import qualified Data.Atomic as Atomic
import Data.Int (Int64)
import Prelude hiding (read)

-- | A mutable, integer-valued counter.
Expand All @@ -23,13 +22,13 @@ new :: IO Counter
new = C `fmap` Atomic.new 0

-- | Get the current value of the counter.
read :: Counter -> IO Int64
read :: Counter -> IO Int
read = Atomic.read . unC

-- | Increase the counter by one.
inc :: Counter -> IO ()
inc counter = add counter 1

-- | Add the argument to the counter.
add :: Counter -> Int64 -> IO ()
add :: Counter -> Int -> IO ()
add counter = Atomic.add (unC counter)
Loading