Skip to content

Commit 3d03583

Browse files
Added example for parallel Monte-Carlson algorithm using MT2203 stream
1 parent 46da35e commit 3d03583

File tree

7 files changed

+454
-0
lines changed

7 files changed

+454
-0
lines changed

example/README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Parallel Monte-Carlo example
2+
3+
Using `mkl_random` package, we use MT-2203 family of pseudo-random number generation algorithms,
4+
we create workers, assign them RandomState objects with different members of the family of algorithms,
5+
and use multiprocessing Pool to distribute chunks of MC work to them to process.
6+
7+
Each worker gets `rs` and `n` arguments, `rs` representing RandomState object associated with the worker,
8+
and `n` being the size of the problem. `rs` is used to generate samples of size `n`, perform Monte-Carlo
9+
estimate(s) based on the sample and return.
10+
11+
After run is complete, a generator is returns that contains results of each worker.
12+
13+
This data is post-processed as necessary for the application.
14+
15+
## Stick triangle problem
16+
17+
Code is tested to estimate the probability that 3 segments, obtained by splitting a unit stick
18+
in two randomly chosen places, can be sides of a triangle. This probability is known in closed form to be $\frac{1}{4}$.
19+
20+
## Stick tetrahedron problem
21+
22+
Code is used to estimate the probability that 6 segments, obtained by splitting a unit stick in
23+
5 random chosen places, can be sides of a tetrahedron.
24+
25+
The probability is not known in closed form. See
26+
[math.stackexchange.com/questions/351913](https://math.stackexchange.com/questions/351913/probability-that-a-stick-randomly-broken-in-five-places-can-form-a-tetrahedron) for more details.

example/arg_parsing.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import argparse
2+
3+
__all__ = ['parse_arguments']
4+
5+
def pos_int(s):
6+
v = int(s)
7+
if v > 0:
8+
return v
9+
else:
10+
raise argparse.ArgumentTypeError('%r is not a positive integer' % s)
11+
12+
13+
def nonneg_int(s):
14+
v = int(s)
15+
if v >= 0:
16+
return v
17+
else:
18+
raise argparse.ArgumentTypeError('%r is not a non-negative integer' % s)
19+
20+
21+
def parse_arguments():
22+
argParser = argparse.ArgumentParser(
23+
prog="stick_tetrahedron.py",
24+
description="Monte-Carlo estimation of probability that 6 segments of a stick randomly broken in 5 places can form a tetrahedron.",
25+
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
26+
27+
argParser.add_argument('-s', '--seed', default=7777, type=pos_int, help="Random seed to initialize algorithms from MT2203 family")
28+
argParser.add_argument('-b', '--batch_size', default=65536, type=pos_int, help="Batch size for the Monte-Carlo run")
29+
argParser.add_argument('-n', '--batch_count', default=2048, type=pos_int, help="Number of batches executed in parallel")
30+
argParser.add_argument('-p', '--processes', default=-1, type=int, help="Number of processes used to execute batches")
31+
argParser.add_argument('-d', '--id_offset', default=0, type=nonneg_int, help="Offset for the MT2203/WH algorithms id")
32+
argParser.add_argument('-j', '--jump_size', default=0, type=nonneg_int, help="Jump size for skip-ahead")
33+
34+
args = argParser.parse_args()
35+
36+
return args

example/fancy.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import numpy as np
2+
import mkl_random as rnd
3+
4+
__doc__ = """
5+
Let's solve a classic problem of MC-estimating a probability that 3 segments of a unit stick randomly broken in 2 places can form a triangle.
6+
Let $u_1$ and $u_2$ be standard uniform random variables, denoting positions where the stick has been broken.
7+
8+
Let $w_1 = \min(u_1, u_2)$ and $w_2 = \max(u_1, u_2)$. Then, length of segments are $x_1 = w_1$, $x_2 = w_2-w_1$, $x_3 = 1-w_2$.
9+
These lengths must satisfy triangle inequality.
10+
11+
The closed form result is known to be $\frac{1}{4}$.
12+
13+
"""
14+
15+
def triangle_inequality(x1, x2, x3):
16+
"""Efficiently finds `np.less(x1,x2+x3)*np.less(x2,x1+x3)*np.less(x3,x1+x2)`"""
17+
tmp_sum = x2 + x3
18+
res = np.less(x1, tmp_sum) # x1 < x2 + x3
19+
np.add(x1, x3, out=tmp_sum)
20+
buf = np.less(x2, tmp_sum) # x2 < x1 + x3
21+
np.logical_and(res, buf, out=res)
22+
np.add(x1, x2, out=tmp_sum)
23+
np.less(x3, tmp_sum, out=buf) # x3 < x1 + x2
24+
np.logical_and(res, buf, out=res)
25+
return res
26+
27+
28+
def mc_dist(rs, n):
29+
"""Monte Carlo estimate of probability on sample of size `n`, using given random state object `rs`"""
30+
ws = np.sort(rs.rand(2,n), axis=0)
31+
x2 = np.empty(n, dtype=np.double)
32+
x3 = np.empty(n, dtype=np.double)
33+
34+
x1 = ws[0]
35+
np.subtract(ws[1], ws[0], out=x2)
36+
np.subtract(1, ws[1], out=x3)
37+
mc_prob = triangle_inequality(x1, x2, x3).sum() / n
38+
39+
return mc_prob
40+
41+
42+
def assign_worker_rs(w_rs):
43+
"""Assign process local random state variable `rs` the given value"""
44+
assert not 'rs' in globals(), "Here comes trouble. Process is not expected to have global variable `rs`"
45+
46+
global rs
47+
rs = w_rs
48+
# wait to ensure that the assignment takes place for each worker
49+
b.wait()
50+
51+
52+
def worker_compute(w_id):
53+
return mc_dist(rs, batch_size)
54+
55+
56+
if __name__ == '__main__':
57+
import multiprocessing as mp
58+
from itertools import repeat
59+
from timeit import default_timer as timer
60+
61+
seed = 77777
62+
n_workers = 12
63+
batch_size = 1024 * 256
64+
batches = 10000
65+
66+
t0 = timer()
67+
# Create instances of RandomState for each worker process from MT2203 family of generators
68+
rss = [ rnd.RandomState(seed, brng=('MT2203', idx)) for idx in range(n_workers) ]
69+
# use of Barrier ensures that every worker gets one
70+
b = mp.Barrier(n_workers)
71+
72+
with mp.Pool(processes=n_workers) as pool:
73+
# map over every worker once to distribute RandomState instances
74+
pool.map(assign_worker_rs, rss, chunksize=1)
75+
# Perform computations on workers
76+
r = pool.map(worker_compute, range(batches), chunksize=1)
77+
78+
# retrieve values of estimates into numpy array
79+
ps = np.fromiter(r, dtype=np.double)
80+
# compute sample estimator's mean and standard deviation
81+
p_est = ps.mean()
82+
pop_std = ps.std()
83+
t1 = timer()
84+
85+
dig = 3 - int(np.log10(pop_std))
86+
frm_str = "{0:0." + str(dig) + "f}"
87+
print(("Monte-Carlo estimate of probability: " + frm_str).format(p_est))
88+
print(("Population estimate of the estimator's standard deviation: " + frm_str).format(pop_std))
89+
print(("Expected standard deviation of the estimator: " + frm_str).format(np.sqrt(p_est * (1-p_est)/batch_size)))
90+
print("Execution time: {0:0.3f} seconds".format(t1-t0))

example/parallel_mc.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import multiprocessing as mp
2+
3+
__all__ = ['parallel_mc_run']
4+
5+
def worker_compute(w_id):
6+
"Worker function executed on the spawned slave process"
7+
# global _local_rs
8+
return _worker_mc_compute(_local_rs)
9+
10+
11+
def assign_worker_rs(w_rs):
12+
"""Assign process local random state variable `rs` the given value"""
13+
assert not '_local_rs' in globals(), "Here comes trouble. Process is not expected to have global variable `_local_rs`"
14+
15+
global _local_rs
16+
_local_rs = w_rs
17+
# wait to ensure that the assignment takes place for each worker
18+
b.wait()
19+
20+
def parallel_mc_run(random_states, n_workers, n_batches, mc_func):
21+
"""
22+
Given iterable `random_states` of length `n_workers`, the number of batches `n_batches`,
23+
and the function `worker_compute` to execute, return iterator with results returned by
24+
the supplied function. The function is expected to conform to signature f(worker_id),
25+
and has access to worker-local global variable `rs`, containing worker's random states.
26+
"""
27+
# use of Barrier ensures that every worker gets one
28+
global b, _worker_mc_compute
29+
b = mp.Barrier(n_workers)
30+
31+
_worker_mc_compute = mc_func
32+
with mp.Pool(processes=n_workers) as pool:
33+
# 1. map over every worker once to distribute RandomState instances
34+
pool.map(assign_worker_rs, random_states, chunksize=1)
35+
# 2. Perform computations on workers
36+
r = pool.map(worker_compute, range(n_batches), chunksize=1)
37+
38+
return r
39+
40+
41+
def sequential_mc_run(random_states, n_workers, n_batches, mc_func):
42+
for rs in random_states:
43+
for _ in range(n_batches):
44+
yield mc_func(rs)

example/parallel_random_states.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import mkl_random as rnd
2+
3+
4+
def build_MT2203_random_states(seed, id0, n_workers):
5+
# Create instances of RandomState for each worker process from MT2203 family of generators
6+
return (rnd.RandomState(seed, brng=('MT2203', id0 + idx)) for idx in range(n_workers))
7+
8+
9+
def build_SFMT19937_random_states(seed, jump_size, n_workers):
10+
import copy
11+
# Create instances of RandomState for each worker process from MT2203 family of generators
12+
rs = rnd.RandomState(seed, brng='SFMT19937')
13+
yield copy.copy(rs)
14+
for _ in range(1, n_workers):
15+
rs.skipahead(jump_size)
16+
yield copy.copy(rs)
17+

example/stick_tetrahedron.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import numpy as np
2+
from parallel_mc import parallel_mc_run, sequential_mc_run
3+
from parallel_random_states import build_MT2203_random_states
4+
from sticky_math import mc_six_piece_stick_tetrahedron_prob
5+
from arg_parsing import parse_arguments
6+
7+
def mc_runner(rs):
8+
return mc_six_piece_stick_tetrahedron_prob(rs, batch_size)
9+
10+
def aggregate_mc_counts(counts, n_batches, batch_size):
11+
ps = counts / batch_size
12+
# compute sample estimator's mean and standard deviation
13+
p_est = ps.mean()
14+
p_std = ps.std()/np.sqrt(batches)
15+
16+
# compute parameters for Baysean posterior of the probability
17+
event_count = 0
18+
nonevent_count = 0
19+
for ni in counts:
20+
event_count += int(ni)
21+
nonevent_count += int(batch_size - ni)
22+
23+
assert event_count >= 0
24+
assert nonevent_count >= 0
25+
return (p_est, p_std, event_count, nonevent_count)
26+
27+
28+
def print_result(p_est, p_std, mc_size):
29+
dig = 3 - int(np.log10(p_std)) # only show 3 digits past width of confidence interval
30+
frm_str = "{0:0." + str(dig) + "f}"
31+
32+
print(("Monte-Carlo estimate of probability: " + frm_str).format(p_est))
33+
print(("Population estimate of the estimator's standard deviation: " + frm_str).format(p_std))
34+
print(("Expected standard deviation of the estimator: " + frm_str).format(np.sqrt(p_est * (1-p_est)/mc_size)))
35+
print("Total MC size: {}".format(mc_size))
36+
37+
38+
if __name__ == '__main__':
39+
import multiprocessing as mp
40+
from itertools import repeat
41+
from timeit import default_timer as timer
42+
import sys
43+
44+
args = parse_arguments()
45+
46+
seed = args.seed
47+
n_workers = args.processes
48+
if n_workers <= 0:
49+
n_workers = mp.cpu_count()
50+
51+
batch_size = args.batch_size
52+
batches = args.batch_count
53+
id0 = args.id_offset
54+
55+
t0 = timer()
56+
57+
rss = build_MT2203_random_states(seed, id0, n_workers)
58+
r = parallel_mc_run(rss, n_workers, batches, mc_runner)
59+
# r = sequential_mc_run(rss, n_workers, batches, mc_runner)
60+
61+
# retrieve values of estimates into numpy array
62+
counts = np.fromiter(r, dtype=np.double)
63+
p_est, p_std, event_count, nonevent_count = aggregate_mc_counts(counts, batches, batch_size)
64+
65+
t1 = timer()
66+
67+
68+
print("Input parameters: -s {seed} -b {batchSize} -n {numBatches} -p {processes} -d {idOffset}".format(
69+
seed=args.seed, batchSize=args.batch_size, numBatches=args.batch_count, processes=n_workers, idOffset=args.id_offset))
70+
print("")
71+
print_result(p_est, p_std, batches * batch_size)
72+
print("")
73+
print("Bayesian posterior beta distribution parameters: ({0}, {1})".format(event_count, nonevent_count))
74+
print("")
75+
print("Execution time: {0:0.3f} seconds".format(t1-t0))

0 commit comments

Comments
 (0)