Skip to content

Init: Randomized Quasi Monte Carlo Method #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
.mypy_cache/
.pytest_cache/
__pycache__/
.hypothesis
.hypothesis/
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
numpy~=1.26.4
scipy~=1.13.1
matplotlib~=3.8.4
numba~=0.59.0
129 changes: 107 additions & 22 deletions src/algorithms/support_algorithms/rqmc.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import timeit
from typing import Any, Callable
from typing import Callable

import numpy as np
import numpy._typing as tpg
import scipy
from numba import njit

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А точно numba уже нужна?

BITS = 64
"""Number of bits in XOR"""


class RQMC:
Expand All @@ -24,18 +27,51 @@ def __init__(
func: Callable,
error_tolerance: float = 1e-6,
count: int = 25,
base_n: int = 2**4,
i_max: int = 600,
base_n: int = 2**6,
i_max: int = 100,
a: float = 0.00047,
):
self._args_parse(error_tolerance, count, base_n, i_max, a)
self.func = func
self.error_tolerance = error_tolerance
self.count = count
self.base_n = base_n
self.i_max = i_max
self.z = scipy.stats.norm.ppf(1 - a / 2)

def independent_estimator(self, values: np._typing.NDArray) -> float:
@staticmethod
def _args_parse(error_tolerance: float, count: int, base_n: int, i_max: int, a: float) -> None:
"""Parse arguments
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

На парсинг не тянет, это скорее валидация


Args:
func: integrated function
error_tolerance: pre-specified error tolerance
count: number of rows of random values matrix
base_n: number of columns of random values matrix
i_max: allowed number of cycles
a: parameter for quantile of normal distribution

Returns: None

Raises:
ValueError: if any argument is not positive
ValueError: if base n is not power of 2
"""

if error_tolerance < 0:
raise ValueError("Error tolerance must be positive")
if count <= 0:
raise ValueError("Count must be positive")
if base_n <= 0:
raise ValueError("Base n must be positive")
if base_n & (base_n - 1) != 0:
raise ValueError("Base n must be power of 2")
if i_max <= 0:
raise ValueError("i_max must be positive")
if a <= 0 or a > 2:
raise ValueError("a upper bound is 2 and lower bound is 0")

def _independent_estimator(self, values: np._typing.NDArray) -> float:
"""Apply function to row of matrix and find mean of row

Args:
Expand All @@ -47,7 +83,7 @@ def independent_estimator(self, values: np._typing.NDArray) -> float:
vfunc = np.vectorize(self.func)
return 1 / len(values) * np.sum(vfunc(values))

def estimator(self, random_matrix: np._typing.NDArray) -> tuple[float, np._typing.NDArray]:
def _estimator(self, random_matrix: np._typing.NDArray) -> tuple[float, np._typing.NDArray]:
"""Find mean of all rows

Args:
Expand All @@ -56,10 +92,10 @@ def estimator(self, random_matrix: np._typing.NDArray) -> tuple[float, np._typin
Returns: mean of all rows

"""
values = np.array(list(map(self.independent_estimator, random_matrix)))
values = np.array(list(map(self._independent_estimator, random_matrix)))
return 1 / self.count * np.sum(values), values

def update_independent_estimator(self, i: int, old_value: float, new_values: np._typing.NDArray) -> float:
def _update_independent_estimator(self, i: int, old_value: float, new_values: np._typing.NDArray) -> float:
"""Update mean of row

Args:
Expand All @@ -70,11 +106,11 @@ def update_independent_estimator(self, i: int, old_value: float, new_values: np.
Returns: Updated mean of row

"""
return (i * old_value + (i + self.base_n) * self.independent_estimator(new_values[: i * self.base_n])) / (
return (i * old_value + (i + self.base_n) * self._independent_estimator(new_values[: i * self.base_n])) / (
2 * i + self.base_n
)

def update(
def _update(
self, j: int, old_values: np._typing.NDArray, random_matrix: np._typing.NDArray
) -> tuple[float, np._typing.NDArray]:
"""Update mean of all rows
Expand All @@ -91,13 +127,13 @@ def update(
sum_of_new: float = 0.0
for i in range(self.count):
old_value, new_values = old_values[i], random_matrix[i]
value = self.update_independent_estimator(j, old_value, new_values)
value = self._update_independent_estimator(j, old_value, new_values)
values.append(value)
sum_of_new += value
values = np.array(values)
return (1 / self.count) * sum_of_new, values

def sigma(self, values: np._typing.NDArray, approximation: float) -> float:
def _sigma(self, values: np._typing.NDArray, approximation: float) -> float:
"""Calculate parameter sigma for estimation

Args:
Expand All @@ -110,24 +146,73 @@ def sigma(self, values: np._typing.NDArray, approximation: float) -> float:
diff = np.sum(np.power(values - approximation, 2))
return np.sqrt(1 / (self.count - 1) * diff)

def rqmc(self) -> float:
def rqmc(self) -> tuple[float, float]:
"""Main function of algorithm

Returns: approximation for integral of function from 0 to 1

"""
sample = np.random.rand(self.count, self.base_n)
approximation, values = self.estimator(sample)
current_error_tolerance = self.sigma(values, approximation) * self.z
sobol_sampler = scipy.stats.qmc.Sobol(d=1, scramble=False)
sobol_sample = np.repeat(sobol_sampler.random(self.base_n).transpose(), self.count, axis=0)
xor_sample = np.array(np.random.rand(1, self.count)[0])
sample = self._digital_shift(sobol_sample, xor_sample)
approximation, values = self._estimator(sample)
current_error_tolerance = self._sigma(values, approximation) * self.z
for i in range(1, self.i_max):
if current_error_tolerance < self.error_tolerance:
return approximation
sample = np.random.rand(self.count, self.base_n * i)
approximation, values = self.update(i * self.base_n, values, sample)
current_error_tolerance = self.sigma(values, approximation) * self.z
return approximation
return approximation, current_error_tolerance
sobol_sampler.reset()
sobol_sample = np.repeat(sobol_sampler.random(self.base_n * i).transpose(), self.count, axis=0)
sample = self._digital_shift(sobol_sample, xor_sample)
approximation, values = self._update(i * self.base_n, values, sample)
current_error_tolerance = self._sigma(values, approximation) * self.z
return approximation, current_error_tolerance

def _digital_shift(self, sobol_sequences: tpg.NDArray, xor_sample: tpg.NDArray) -> tpg.NDArray:
"""Digital shift of the sobol sequence

Args:
sobol_sequences: B Sobol sequences with length i*N
xor_sample: Sample of Uniform distribution with length B

Returns: XOR Sobol sequences with xor sample

"""

def inner_func(sequence: tpg.NDArray, random_value: float) -> tpg.NDArray:
"""Xor sequence with random value

Args:
sequence: Sobol sequence of length i*N
random_value: Random value from sample of Uniform distribution

Returns: XOR sequence with random value

"""
return np.array(list(map(lambda x: self._xor_float(x, random_value), sequence)))

pair = list(zip(sobol_sequences, xor_sample))
sobol_sequences = np.array(list(map(lambda x: inner_func(*x), pair)))

return sobol_sequences

@staticmethod
@njit(fastmath=True)
def _xor_float(a: float, b: float) -> float:
"""XOR float values

Args:
a: First float value
b: Second float value

Returns: XOR float value

"""
a = int(a * (2**BITS))
b = int(b * (2**BITS))
return np.bitwise_xor(a, b) / 2**BITS

def __call__(self) -> float:
def __call__(self) -> tuple[float, float]:
"""Interface for users

Returns: approximation for integral of function from 0 to 1
Expand Down
25 changes: 22 additions & 3 deletions tests/algorithms/support_algorithms/test_rqmc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ def loss_func(true_func: Callable, rqms: Callable, count: int):
true_value = true_func(1) - true_func(0)
sum_of_diff = 0
for _ in range(count):
sum_of_diff += abs(true_value - rqms())
sum_of_diff += abs(true_value - rqms()[0])
return 1 / count * sum_of_diff


class TestSimplyFunctions:
error_tolerance = 1e-3
error_tolerance = 1e-5

def test_constant_func(self):
rqmc = RQMC(lambda x: 1, error_tolerance=self.error_tolerance)
Expand All @@ -31,7 +31,7 @@ def test_polynom_func(self):


class TestHardFunctions:
error_tolerance = 1e-3
error_tolerance = 1e-4

def test_trigonometric_func(self):
rqmc = RQMC(lambda x: np.sin(x) + np.cos(x), error_tolerance=self.error_tolerance, i_max=100)
Expand All @@ -48,3 +48,22 @@ def test_log_function(self):
lambda x: np.sign(x - 0.5) * abs(np.log(abs(x - 0.5))), error_tolerance=self.error_tolerance, i_max=100
)
assert loss_func(lambda x: 0, rqmc.rqmc, 100)


class TestArgsParse:
@pytest.mark.parametrize(
"args",
[
(-1, 1, 2, 1, 1),
(1, -1, 2, 1, 1),
(1, 1, -1, 1, 1),
(1, 1, 2, -1, 1),
(1, 1, 1, 1, -1),
(1, 1, 3, 1, 1),
(1, 1, 2, 1, 10),
],
)
def test_args_parse(self, args):
print(args)
with pytest.raises(ValueError):
RQMC._args_parse(*args)
Loading