Skip to content

Semiparametric Mu Estimation for NMV mixtures #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.dev.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mypy~=1.10.0
black~=24.4.2
isort~=5.13.2
mpmath~=1.3.0
pytest~=7.4.4
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
numpy~=1.26.4
scipy~=1.13.1
matplotlib~=3.8.4
mpmath~=1.3.0
120 changes: 120 additions & 0 deletions src/algorithms/nvm_semi_param_algorithms/mu_estimation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import math
from typing import Callable, TypedDict, Unpack

import mpmath
import numpy as np
from numpy import _typing

M_DEFAULT_VALUE = 1000
TOLERANCE_DEFAULT_VALUE = 10**-5
OMEGA_DEFAULT_VALUE = lambda x: -1 * math.sin(x) if abs(x) <= math.pi else 0
MAX_ITERATIONS_DEFAULT_VALUE = 10**9


class SemiParametricMuEstimation:
"""Estimation of mu parameter of NVM mixture represented in canonical form Y = alpha + mu*xi + sqrt(xi)*N,
where alpha = 0

Args:
sample: sample of the analysed distribution
params: parameters of the algorithm
m - search area,
tolerance - defines where to stop binary search,
omega - Lipschitz continuous odd function on R with compact support

"""

class ParamsAnnotation(TypedDict, total=False):
"""Class for parameters annotation"""

m: float
tolerance: float
omega: Callable[[float], float]
max_iterations: float

def __init__(self, sample: _typing.ArrayLike = None, **kwargs: Unpack[ParamsAnnotation]):
self.sample = np.array([]) if sample is None else sample
self.m, self.tolerance, self.omega, self.max_iterations = self._validate_kwargs(**kwargs)

def _validate_kwargs(
self, **kwargs: Unpack[ParamsAnnotation]
) -> tuple[float, float, Callable[[float], float], float]:
"""Parameters validation function

Args:
kwargs: Parameters of Algorithm

Returns: Parameters of Algorithm

"""
if any([i not in self.ParamsAnnotation.__annotations__ for i in kwargs]):
raise ValueError("Got unexpected parameter")
if "m" in kwargs and (not isinstance(kwargs.get("m"), int) or kwargs.get("m", -1) <= 0):
raise ValueError("Expected positive integer as parameter m")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if "m" in kwargs and (not isinstance(kwargs.get("m"), int) or kwargs.get("m", -1) <= 0):
raise ValueError("Expected positive integer as parameter m")
m = kwargs.get("m", M_DEFAULT_VALUE)
if not isinstance(m, int) or m <= 0:
raise ValueError("Expected positive integer as parameter m")

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Мне кажется, так попроще

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А ещё можно ключи, по которым получаются параметры, тоже в константы вынести

if "tolerance" in kwargs and (
not isinstance(kwargs.get("tolerance"), (int, float)) or kwargs.get("tolerance", -1) <= 0
):
raise ValueError("Expected positive float as parameter tolerance")
if "omega" in kwargs and not callable(kwargs.get("omega")):
raise ValueError("Expected callable object as parameter omega")
if "max_iterations" in kwargs and (
not isinstance(kwargs.get("max_iterations"), int) or kwargs.get("max_iterations", -1) <= 0
):
raise ValueError("Expected positive integer as parameter max_iterations")
return (
kwargs.get("m", M_DEFAULT_VALUE),
kwargs.get("tolerance", TOLERANCE_DEFAULT_VALUE),
kwargs.get("omega", OMEGA_DEFAULT_VALUE),
kwargs.get("max_iterations", MAX_ITERATIONS_DEFAULT_VALUE),
)

def __w(self, p: float, sample: np._typing.NDArray) -> float:
"""Root of this function is an estimation of mu

Args:
p: float
sample: sample of the analysed distribution

Returns: function value

"""
y = 0.0
for x in sample:
try:
e = math.exp(-p * x)
except OverflowError:
e = mpmath.exp(-p * x)
y += e * self.omega(x)
return y

def algorithm(self, sample: np._typing.NDArray) -> float:
"""Root of this function is an estimation of mu

Args:
sample: sample of the analysed distribution

Returns: estimated mu value

"""

if self.__w(0, sample) == 0:
return 0
if self.__w(0, sample) > 0:
return -1 * self.algorithm(-1 * sample)
if self.__w(self.m, sample) < 0:
return self.m

left, right = 0.0, self.m
iteration = 0
while left <= right:
mid = (right + left) / 2
if iteration > self.max_iterations:
return mid
iteration += 1
if abs(self.__w(mid, sample)) < self.tolerance:
return mid
elif self.__w(mid, sample) < 0:
left = mid
else:
right = mid
return -1
25 changes: 7 additions & 18 deletions src/mixtures/abstract_mixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,9 @@
class AbstractMixtures(metaclass=ABCMeta):
"""Base class for Mixtures"""

def __init__(self, param_collector: Registry, semi_param_collector: Registry) -> None:
"""

Args:
param_collector: Collector of implementations of parametric algorithms
semi_param_collector: Collector of implementations of semi-parametric algorithms

"""
self.param_collector = param_collector
self.semi_param_collector = semi_param_collector
def __init__(self) -> None:
self.param_collector: Registry = Registry()
self.semi_param_collector: Registry = Registry()

@abstractmethod
def classic_generate(
Expand All @@ -35,7 +28,6 @@ def classic_generate(
Returns: samples of given size

"""
...

@abstractmethod
def canonical_generate(
Expand All @@ -51,32 +43,29 @@ def canonical_generate(
Returns: samples of given size

"""
...

@abstractmethod
def param_algorithm(self, name: str, selection: _typing.ArrayLike, params: list[float]) -> Any:
def param_algorithm(self, name: str, sample: _typing.ArrayLike, params: dict) -> Any:
"""Select and run parametric algorithm

Args:
name: Name of Algorithm
selection: Vector of random values
sample: Vector of random values
params: Parameters of Algorithm

Returns: TODO

"""
...

@abstractmethod
def semi_param_algorithm(self, name: str, selection: _typing.ArrayLike, params: list[float]) -> Any:
def semi_param_algorithm(self, name: str, sample: _typing.ArrayLike, params: dict) -> Any:
"""Select and run semi-parametric algorithm

Args:
name: Name of Algorithm
selection: Vector of random values
sample: Vector of random values
params: Parameters of Algorithm

Returns: TODO

"""
...
26 changes: 10 additions & 16 deletions src/mixtures/nm_mixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,11 @@
from numpy import _typing

from src.mixtures.abstract_mixture import AbstractMixtures
from src.register.register import Registry


class NormalMeanMixtures(AbstractMixtures):
def __init__(self, param_collector: Registry, semi_param_collector: Registry) -> None:
"""

Args:
param_collector: Collector of implementations of parametric algorithms
semi_param_collector: Collector of implementations of semi-parametric algorithms

"""
super().__init__(param_collector, semi_param_collector)
def __init__(self) -> None:
super().__init__()
...

@staticmethod
Expand Down Expand Up @@ -89,28 +81,30 @@ def canonical_generate(
normal_values = scipy.stats.norm.rvs(size=size)
return mixing_values + sigma * normal_values

def param_algorithm(self, name: str, selection: _typing.ArrayLike, params: list[float]) -> Any:
def param_algorithm(self, name: str, sample: _typing.ArrayLike, params: dict) -> Any:
"""Select and run parametric algorithm for NMM

Args:
name: Name of Algorithm
selection: Vector of random values
sample: Vector of random values
params: Parameters of Algorithm

Returns: TODO

"""
...
cls = self.param_collector.dispatch(name)(sample, **params)
return cls.algorithm(sample)

def semi_param_algorithm(self, name: str, selection: _typing.ArrayLike, params: list[float]) -> Any:
def semi_param_algorithm(self, name: str, sample: _typing.ArrayLike, params: dict) -> Any:
"""Select and run semi-parametric algorithm for NMM

Args:
name: Name of Algorithm
selection: Vector of random values
sample: Vector of random values
params: Parameters of Algorithm

Returns: TODO

"""
...
cls = self.semi_param_collector.dispatch(name)(sample, **params)
return cls.algorithm(sample)
30 changes: 14 additions & 16 deletions src/mixtures/nmv_mixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,15 @@
import scipy
from numpy import _typing

from src.algorithms.nvm_semi_param_algorithms.mu_estimation import SemiParametricMuEstimation
from src.mixtures.abstract_mixture import AbstractMixtures
from src.register.register import Registry


class NormalMeanVarianceMixtures(AbstractMixtures):

def __init__(self, param_collector: Registry, semi_param_collector: Registry) -> None:
"""

Args:
param_collector: Collector of implementations of parametric algorithms
semi_param_collector: Collector of implementations of semi-parametric algorithms

"""
super().__init__(param_collector, semi_param_collector)
def __init__(self) -> None:
super().__init__()
self.semi_param_collector.register("mu_estimation")(SemiParametricMuEstimation)
...

@staticmethod
Expand Down Expand Up @@ -88,28 +82,32 @@ def canonical_generate(
normal_values = scipy.stats.norm.rvs(size=size)
return alpha + mu * mixing_values + (mixing_values**0.5) * normal_values

def param_algorithm(self, name: str, selection: _typing.ArrayLike, params: list[float]) -> Any:
def param_algorithm(self, name: str, sample: _typing.ArrayLike, params: dict) -> Any:
"""Select and run parametric algorithm for NMVM

Args:
name: Name of Algorithm
selection: Vector of random values
sample: Vector of random values
params: Parameters of Algorithm

Returns: TODO

"""
...
cls = self.param_collector.dispatch(name)(sample, **params)
return cls.algorithm(sample)

def semi_param_algorithm(self, name: str, selection: _typing.ArrayLike, params: list[float]) -> Any:
def semi_param_algorithm(self, name: str, sample: _typing.ArrayLike, params: dict = None) -> Any:
"""Select and run semi-parametric algorithm for NMVM

Args:
name: Name of Algorithm
selection: Vector of random values
sample: Vector of random values
params: Parameters of Algorithm

Returns: TODO

"""
...
if params is None:
params = {}
cls = self.semi_param_collector.dispatch(name)(sample, **params)
return cls.algorithm(sample)
26 changes: 10 additions & 16 deletions src/mixtures/nv_mixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,12 @@
from numpy import _typing

from src.mixtures.abstract_mixture import AbstractMixtures
from src.register.register import Registry


class NormalVarianceMixtures(AbstractMixtures):

def __init__(self, param_collector: Registry, semi_param_collector: Registry) -> None:
"""

Args:
param_collector: Collector of implementations of parametric algorithms
semi_param_collector: Collector of implementations of semi-parametric algorithms

"""
super().__init__(param_collector, semi_param_collector)
def __init__(self) -> None:
super().__init__()
...

@staticmethod
Expand Down Expand Up @@ -88,28 +80,30 @@ def canonical_generate(
normal_values = scipy.stats.norm.rvs(size=size)
return alpha + (mixing_values**0.5) * normal_values

def param_algorithm(self, name: str, selection: _typing.ArrayLike, params: list[float]) -> Any:
def param_algorithm(self, name: str, sample: _typing.ArrayLike, params: dict) -> Any:
"""Select and run parametric algorithm for NVM

Args:
name: Name of Algorithm
selection: Vector of random values
sample: Vector of random values
params: Parameters of Algorithm

Returns: TODO

"""
...
cls = self.param_collector.dispatch(name)(sample, params)
return cls.algorithm(sample)

def semi_param_algorithm(self, name: str, selection: _typing.ArrayLike, params: list[float]) -> Any:
def semi_param_algorithm(self, name: str, sample: _typing.ArrayLike, params: dict) -> Any:
"""Select and run semi-parametric algorithm for NVM

Args:
name: Name of Algorithm
selection: Vector of random values
sample: Vector of random values
params: Parameters of Algorithm

Returns: TODO

"""
...
cls = self.semi_param_collector.dispatch(name)(sample, **params)
return cls.algorithm(sample)
Empty file added tests/algorithms/__init__.py
Empty file.
Empty file.
Loading
Loading