Skip to content

Commit fac5505

Browse files
committed
Reimplement distribution metric with GHC prim ops
This commit attempts to address issue haskell-github-trust#41 of tibbe/ekg-core by replacing the C code for the distribution metric with GHC prim ops. The performance of this implementation is about half that of the existing C code in a single-threaded benchmark; without masking the performance is comparable. This commit is based on the work of Travis Whitaker in PR haskell-github-trust#42 of tibbe/ekg-core.
1 parent 8daa77a commit fac5505

File tree

2 files changed

+226
-168
lines changed

2 files changed

+226
-168
lines changed

System/Metrics/Distribution.hs

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
{-# LANGUAGE BangPatterns #-}
2+
{-# LANGUAGE CPP #-}
3+
{-# LANGUAGE MagicHash #-}
4+
{-# LANGUAGE TypeApplications #-}
5+
{-# LANGUAGE UnboxedTuples #-}
6+
7+
module System.Metrics.Distribution
8+
( Distribution
9+
, new
10+
, add
11+
, addN
12+
, read
13+
14+
-- * Gathered statistics
15+
, Internal.Stats
16+
, Internal.mean
17+
, Internal.variance
18+
, Internal.count
19+
, Internal.sum
20+
, Internal.min
21+
, Internal.max
22+
) where
23+
24+
import qualified Prelude
25+
import Prelude hiding (max, min, read, sum)
26+
27+
import Control.Exception (assert)
28+
import Control.Monad (forM_, replicateM)
29+
import qualified Data.Array as A
30+
import Data.Primitive.ByteArray
31+
import Data.Primitive.MachDeps (sIZEOF_INT)
32+
import GHC.Int
33+
import GHC.IO
34+
import GHC.Prim
35+
36+
import qualified System.Metrics.Distribution.Internal as Internal
37+
import System.Metrics.ThreadId
38+
39+
------------------------------------------------------------------------
40+
41+
-- | An metric for tracking events.
42+
newtype Distribution = Distribution { unD :: A.Array Distrib }
43+
44+
-- | Number of lock stripes. Should be greater or equal to the number
45+
-- of HECs.
46+
numStripes :: Int
47+
numStripes = 8
48+
49+
-- | Get the stripe to use for this thread.
50+
myStripe :: Distribution -> IO Distrib
51+
myStripe distrib = do
52+
tid <- myCapability
53+
return $! unD distrib `A.index` (tid `mod` numStripes)
54+
55+
------------------------------------------------------------------------
56+
57+
newtype Distrib = Distrib (MutableByteArray RealWorld)
58+
59+
sIZEOF_CACHELINE :: Int
60+
sIZEOF_CACHELINE = 64
61+
{-# INLINE sIZEOF_CACHELINE #-}
62+
63+
posLock, posCount, posMean, posSumSqDelta, posSum, posMin, posMax :: Int
64+
-- Putting the variable-sized `Int` field first so that its offset (0)
65+
-- does not depend on its size. This assumes that the size of `Int` is
66+
-- at most 8 bytes.
67+
posLock = 0 -- Int
68+
posCount = 1 -- Int64
69+
posMean = 2 -- Double
70+
posSumSqDelta = 3 -- Double
71+
posSum = 4 -- Double
72+
posMin = 5 -- Double
73+
posMax = 6 -- Double
74+
75+
newDistrib :: IO Distrib
76+
newDistrib = do
77+
arr <- newAlignedPinnedByteArray sIZEOF_CACHELINE sIZEOF_CACHELINE
78+
79+
writeByteArray @Int arr posLock 0
80+
writeByteArray @Int64 arr posCount 0
81+
writeByteArray @Double arr posMean 0.0
82+
writeByteArray @Double arr posSumSqDelta 0.0
83+
writeByteArray @Double arr posSum 0.0
84+
writeByteArray @Double arr posMin inf
85+
writeByteArray @Double arr posMax (-inf)
86+
87+
pure $ Distrib arr
88+
where
89+
inf :: Double
90+
inf = 1/0
91+
92+
withLock :: Distrib -> IO () -> IO ()
93+
withLock distrib action = mask_ $ do
94+
lock distrib
95+
action
96+
unlock distrib
97+
98+
lock :: Distrib -> IO ()
99+
lock (Distrib (MutableByteArray arr#)) = IO $ \s0# ->
100+
case spinlock arr# s0# of
101+
s1# -> (# s1#, () #)
102+
103+
spinlock
104+
:: MutableByteArray# RealWorld -> State# RealWorld -> State# RealWorld
105+
spinlock arr# s0# =
106+
case posLock of { I# posLock# ->
107+
case casIntArray# arr# posLock# 0# 1# s0# of { (# s1#, r# #) ->
108+
case r# of
109+
0# -> s1#
110+
_ -> spinlock arr# s1#
111+
}}
112+
113+
unlock :: Distrib -> IO ()
114+
unlock (Distrib arr) = writeByteArray @Int arr posLock 0
115+
116+
-- Mean and variance are computed according to
117+
-- http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
118+
distribAddN :: Distrib -> Double -> Int64 -> IO ()
119+
distribAddN distrib val n = do
120+
let n' = fromIntegral n :: Double
121+
withLock distrib $ do
122+
let Distrib arr = distrib
123+
124+
oldCount <- readByteArray @Int64 arr posCount
125+
oldMean <- readByteArray @Double arr posMean
126+
oldSumSqDelta <- readByteArray @Double arr posSumSqDelta
127+
oldSum <- readByteArray @Double arr posSum
128+
oldMin <- readByteArray @Double arr posMin
129+
oldMax <- readByteArray @Double arr posMax
130+
131+
let newCount = oldCount + n
132+
delta = val - oldMean
133+
newMean = oldMean + n' * delta / fromIntegral newCount
134+
newSumSqDelta = oldSumSqDelta + delta * (val - newMean) * n'
135+
newSum = oldSum + val -- Shouldn't this be `oldSum + n'*val`?
136+
newMin = Prelude.min oldMin val
137+
newMax = Prelude.max oldMax val
138+
139+
writeByteArray @Int64 arr posCount newCount
140+
writeByteArray @Double arr posMean newMean
141+
writeByteArray @Double arr posSumSqDelta newSumSqDelta
142+
writeByteArray @Double arr posSum newSum
143+
writeByteArray @Double arr posMin newMin
144+
writeByteArray @Double arr posMax newMax
145+
146+
-- http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
147+
distribCombine :: Distrib -> Distrib -> IO ()
148+
distribCombine distrib (Distrib accArr) =
149+
withLock distrib $ do
150+
let Distrib arr = distrib
151+
152+
count <- readByteArray @Int64 arr posCount
153+
mean <- readByteArray @Double arr posMean
154+
sumSqDelta <- readByteArray @Double arr posSumSqDelta
155+
sum <- readByteArray @Double arr posSum
156+
min <- readByteArray @Double arr posMin
157+
max <- readByteArray @Double arr posMax
158+
159+
accCount <- readByteArray @Int64 accArr posCount
160+
accMean <- readByteArray @Double accArr posMean
161+
accSumSqDelta <- readByteArray @Double accArr posSumSqDelta
162+
accSum <- readByteArray @Double accArr posSum
163+
accMin <- readByteArray @Double accArr posMin
164+
accMax <- readByteArray @Double accArr posMax
165+
166+
let newCount = count + accCount
167+
delta = mean - accMean
168+
count' = fromIntegral count
169+
countAcc' = fromIntegral accCount
170+
newCount' = fromIntegral newCount
171+
newMean = (countAcc' * accMean + count' * mean) / newCount'
172+
newSumSqDelta = accSumSqDelta + sumSqDelta +
173+
delta * delta * (countAcc' * count') / newCount'
174+
newSum = sum + accSum
175+
newMin = Prelude.min min accMin
176+
newMax = Prelude.max max accMax
177+
178+
writeByteArray @Int64 accArr posCount newCount
179+
writeByteArray @Double accArr posMean newMean
180+
writeByteArray @Double accArr posSumSqDelta newSumSqDelta
181+
writeByteArray @Double accArr posSum newSum
182+
writeByteArray @Double accArr posMin newMin
183+
writeByteArray @Double accArr posMax newMax
184+
185+
------------------------------------------------------------------------
186+
-- Exposed API
187+
188+
-- | Create a new distribution.
189+
new :: IO Distribution
190+
new = (Distribution . A.fromList numStripes) `fmap`
191+
replicateM numStripes newDistrib
192+
193+
-- | Add a value to the distribution.
194+
add :: Distribution -> Double -> IO ()
195+
add distrib val = addN distrib val 1
196+
197+
-- | Add the same value to the distribution N times.
198+
addN :: Distribution -> Double -> Int64 -> IO ()
199+
addN distribution val n = do
200+
distrib <- myStripe distribution
201+
distribAddN distrib val n
202+
203+
-- | Get the current statistical summary for the event being tracked.
204+
read :: Distribution -> IO Internal.Stats
205+
read distribution = do
206+
result <- newDistrib
207+
forM_ (A.toList $ unD distribution) $ \distrib ->
208+
distribCombine distrib result
209+
210+
let Distrib arr = result
211+
count <- readByteArray @Int64 arr posCount
212+
mean <- readByteArray @Double arr posMean
213+
sumSqDelta <- readByteArray @Double arr posSumSqDelta
214+
sum <- readByteArray @Double arr posSum
215+
min <- readByteArray @Double arr posMin
216+
max <- readByteArray @Double arr posMax
217+
218+
pure $! Internal.Stats
219+
{ Internal.mean = if count == 0 then 0.0 else mean
220+
, Internal.variance = if count == 0 then 0.0
221+
else sumSqDelta / fromIntegral count
222+
, Internal.count = count
223+
, Internal.sum = sum
224+
, Internal.min = if count == 0 then 0.0 else min
225+
, Internal.max = if count == 0 then 0.0 else max
226+
}

System/Metrics/Distribution.hsc

Lines changed: 0 additions & 168 deletions
This file was deleted.

0 commit comments

Comments
 (0)