Skip to content

Init: Randomized Quasi Monte Carlo Method #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
.mypy_cache/
.pytest_cache/
__pycache__/
.hypothesis
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add newline at the end of file

136 changes: 136 additions & 0 deletions src/algorithms/support_algorithms/rqmc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import timeit
from typing import Any, Callable

import numpy as np
import numpy._typing as tpg
import scipy


class RQMC:
"""Randomize Quasi Monte Carlo Method

Args:
func: integrated function
error_tolerance: pre-specified error tolerance
count: number of rows of random values matrix
base_n: number of columns of random values matrix
i_max: allowed number of cycles
a: parameter for quantile of normal distribution

"""

def __init__(
self,
func: Callable,
error_tolerance: float = 1e-6,
count: int = 25,
base_n: int = 2**4,
i_max: int = 600,
a: float = 0.00047,
):
self.func = func
self.error_tolerance = error_tolerance
self.count = count
self.base_n = base_n
self.i_max = i_max
self.z = scipy.stats.norm.ppf(1 - a / 2)

def independent_estimator(self, values: np._typing.NDArray) -> float:
"""Apply function to row of matrix and find mean of row

Args:
values: row of random values matrix

Returns: mean of row

"""
vfunc = np.vectorize(self.func)
return 1 / len(values) * np.sum(vfunc(values))

def estimator(self, random_matrix: np._typing.NDArray) -> tuple[float, np._typing.NDArray]:
"""Find mean of all rows

Args:
random_matrix: matrix of random values

Returns: mean of all rows

"""
values = np.array(list(map(self.independent_estimator, random_matrix)))
return 1 / self.count * np.sum(values), values

def update_independent_estimator(self, i: int, old_value: float, new_values: np._typing.NDArray) -> float:
"""Update mean of row

Args:
i: step count
old_value: previous value of row on i-1 step
new_values: new generated row of random values

Returns: Updated mean of row

"""
return (i * old_value + (i + self.base_n) * self.independent_estimator(new_values[: i * self.base_n])) / (
2 * i + self.base_n
)

def update(
self, j: int, old_values: np._typing.NDArray, random_matrix: np._typing.NDArray
) -> tuple[float, np._typing.NDArray]:
"""Update mean of all rows

Args:
j: step count
old_values: previous values of row on i-1 step
random_matrix: new generated matrix of random values

Returns:Updated mean of all rows

"""
values = []
sum_of_new: float = 0.0
for i in range(self.count):
old_value, new_values = old_values[i], random_matrix[i]
value = self.update_independent_estimator(j, old_value, new_values)
values.append(value)
sum_of_new += value
values = np.array(values)
return (1 / self.count) * sum_of_new, values

def sigma(self, values: np._typing.NDArray, approximation: float) -> float:
"""Calculate parameter sigma for estimation

Args:
values:
approximation:

Returns: return sigma parameter

"""
diff = np.sum(np.power(values - approximation, 2))
return np.sqrt(1 / (self.count - 1) * diff)

def rqmc(self) -> float:
"""Main function of algorithm

Returns: approximation for integral of function from 0 to 1

"""
sample = np.random.rand(self.count, self.base_n)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Получающийся таким образом sample имеет равномерное распределение на [0;1], это не тоже самое что последовательность Соболя:
https://en.wikipedia.org/wiki/Sobol_sequence
Отсюда и проблемы с точностью

Здесь sample необходимо брать как матрицу с одинаковыми строчками, где каждая строчка состоит из элементов последовательности Соболя с номерами 1, ...self.base_n. После чего генерировать self.count независимых случайных величин U_1, .... U_B с равномерным распределением на отрезке [0; 1] и i-ую строчку матрицы sample XOR-ить с U_i (это и есть digital shift)

approximation, values = self.estimator(sample)
current_error_tolerance = self.sigma(values, approximation) * self.z
for i in range(1, self.i_max):
if current_error_tolerance < self.error_tolerance:
return approximation
sample = np.random.rand(self.count, self.base_n * i)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Опять же, здесь sample необходимо брать как матрицу с одинаковыми строчками, где каждая строчка состоит из элементов последовательности Соболя с номерами

  • начиная с (self.base_n - 1) * i
  • заканчивая self.base_n * (i + 1)

После чего генерировать self.count независимых случайных величин U_1, .... U_B с равномерным распределением на отрезке [0; 1] и i-ую строчку матрицы sample XOR-ить с U_i

approximation, values = self.update(i * self.base_n, values, sample)
current_error_tolerance = self.sigma(values, approximation) * self.z
return approximation
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Стоит возвращать также и current_error_tolerance


def __call__(self) -> float:
"""Interface for users

Returns: approximation for integral of function from 0 to 1

"""
return self.rqmc()
50 changes: 50 additions & 0 deletions tests/algorithms/support_algorithms/test_rqmc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from typing import Callable

import numpy as np
import pytest

from src.algorithms.support_algorithms.rqmc import RQMC


def loss_func(true_func: Callable, rqms: Callable, count: int):
true_value = true_func(1) - true_func(0)
sum_of_diff = 0
for _ in range(count):
sum_of_diff += abs(true_value - rqms())
return 1 / count * sum_of_diff


class TestSimplyFunctions:
error_tolerance = 1e-3

def test_constant_func(self):
rqmc = RQMC(lambda x: 1, error_tolerance=self.error_tolerance)
assert loss_func(lambda x: x, rqmc.rqmc, 1000) < self.error_tolerance

def test_linear_func(self):
rqmc = RQMC(lambda x: x, error_tolerance=self.error_tolerance)
assert loss_func(lambda x: np.power(x, 2) / 2, rqmc.rqmc, 100) < self.error_tolerance

def test_polynom_func(self):
rqmc = RQMC(lambda x: x**3 - x**2 + 1, error_tolerance=self.error_tolerance)
assert loss_func(lambda x: (x**4) / 4 - (x**3) / 3 + x, rqmc.rqmc, 100) < self.error_tolerance


class TestHardFunctions:
error_tolerance = 1e-3

def test_trigonometric_func(self):
rqmc = RQMC(lambda x: np.sin(x) + np.cos(x), error_tolerance=self.error_tolerance, i_max=100)
assert loss_func(lambda x: np.sin(x) - np.cos(x), rqmc.rqmc, 100) < self.error_tolerance

def test_mix_function(self):
rqmc = RQMC(
lambda x: (x / np.sin(x)) + (np.exp(-x) / np.cos(x)), error_tolerance=self.error_tolerance, i_max=100
)
assert loss_func(lambda x: 1.79789274334 if x == 1 else 0, rqmc.rqmc, 100)

def test_log_function(self):
rqmc = RQMC(
lambda x: np.sign(x - 0.5) * abs(np.log(abs(x - 0.5))), error_tolerance=self.error_tolerance, i_max=100
)
assert loss_func(lambda x: 0, rqmc.rqmc, 100)
Loading