diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2e0a35e..4bf1af0 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -26,7 +26,7 @@ repos: - id: debug-statements - id: end-of-file-fixer - id: trailing-whitespace - - repo: https://gitlab.com/pycqa/flake8 + - repo: https://github.com/pycqa/flake8 rev: 3.8.4 hooks: - id: flake8 diff --git a/pycid/analyze/reasoning_patterns.py b/pycid/analyze/reasoning_patterns.py index 8b7085a..0911bde 100644 --- a/pycid/analyze/reasoning_patterns.py +++ b/pycid/analyze/reasoning_patterns.py @@ -41,7 +41,7 @@ def _path_is_effective(mb: MACIDBase, path: List[str], effective_set: Set[str]) def _directed_effective_path_not_through_set_y( - mb: MACIDBase, start: str, finish: str, effective_set: Set[str], y: Set[str] = None + mb: MACIDBase, start: str, finish: str, effective_set: Set[str], y: Optional[Set[str]] = None ) -> bool: """Check whether a directed effective path exists that doesn't pass through any of the nodes in the set y.""" if y is None: diff --git a/pycid/core/causal_bayesian_network.py b/pycid/core/causal_bayesian_network.py index e8855ad..f146ccb 100644 --- a/pycid/core/causal_bayesian_network.py +++ b/pycid/core/causal_bayesian_network.py @@ -81,7 +81,7 @@ def to_tabular_cpd(self, variable: str, relationship: Relationship) -> TabularCP elif isinstance(relationship, Mapping): return ConstantCPD(variable, relationship, self.cbn) - def __init__(self, edges: Iterable[Tuple[str, str]] = None, **kwargs: Any): + def __init__(self, edges: Optional[Iterable[Tuple[str, str]]] = None, **kwargs: Any): """Initialize a Causal Bayesian Network Parameters @@ -130,7 +130,7 @@ def is_structural_causal_model(self) -> bool: return True def query( - self, query: Iterable[str], context: Dict[str, Outcome], intervention: Dict[str, Outcome] = None + self, query: Iterable[str], context: Dict[str, Outcome], intervention: Optional[Dict[str, Outcome]] = None ) -> BeliefPropagation: """Return P(query|context, do(intervention))*P(context | do(intervention)). @@ -195,7 +195,7 @@ def expected_value( self, variables: Iterable[str], context: Dict[str, Outcome], - intervention: Dict[str, Outcome] = None, + intervention: Optional[Dict[str, Outcome]] = None, ) -> List[float]: """Compute the expected value of a real-valued variable for a given context, under an optional intervention @@ -267,9 +267,9 @@ def _get_label(self, node: str) -> str: def draw( self, - node_color: Callable[[str], Union[str, np.ndarray]] = None, - node_shape: Callable[[str], str] = None, - node_label: Callable[[str], str] = None, + node_color: Optional[Callable[[str], Union[str, np.ndarray]]] = None, + node_shape: Optional[Callable[[str], str]] = None, + node_label: Optional[Callable[[str], str]] = None, layout: Optional[Callable[[Any], Dict[Any, Any]]] = None, ) -> None: """ diff --git a/pycid/core/cpd.py b/pycid/core/cpd.py index 449808d..7c04a98 100644 --- a/pycid/core/cpd.py +++ b/pycid/core/cpd.py @@ -49,7 +49,7 @@ def __init__( cbn: CausalBayesianNetwork, domain: Optional[Sequence[Outcome]] = None, state_names: Optional[Mapping[str, List]] = None, - label: str = None, + label: Optional[str] = None, ) -> None: """Initialize StochasticFunctionCPD with a variable name and a stochastic function. @@ -130,7 +130,7 @@ def stochastic_function(self, **pv: Outcome) -> Mapping[Outcome, float]: else: return {ret: 1} - def compute_label(self, function: Callable = None) -> str: + def compute_label(self, function: Optional[Callable] = None) -> str: """Try to generate a string that succinctly describes the relationship""" function = function if function is not None else self.func if hasattr(function, "__name__") and function.__name__ != "": @@ -224,7 +224,7 @@ def __init__( variable: str, dictionary: Mapping, cbn: CausalBayesianNetwork, - domain: Sequence[Outcome] = None, + domain: Optional[Sequence[Outcome]] = None, label: Optional[str] = None, ): super().__init__(variable, lambda **pv: dictionary, cbn, domain=domain, label=label or str(dictionary)) @@ -267,7 +267,7 @@ def discrete_uniform(domain: List[Outcome]) -> Dict[Outcome, float]: def noisy_copy( - value: Outcome, probability: float = 0.9, domain: List[Outcome] = None + value: Outcome, probability: float = 0.9, domain: Optional[List[Outcome]] = None ) -> Dict[Outcome, Optional[float]]: """specify a variable's CPD as copying the value of some other variable with a certain probability.""" dist = dict.fromkeys(domain) if domain else {} diff --git a/pycid/core/get_paths.py b/pycid/core/get_paths.py index 6ec33b2..0ceda45 100644 --- a/pycid/core/get_paths.py +++ b/pycid/core/get_paths.py @@ -1,4 +1,4 @@ -from typing import Callable, Iterable, Iterator, List, Sequence, Set, Tuple +from typing import Callable, Iterable, Iterator, List, Optional, Sequence, Set, Tuple import networkx as nx @@ -184,7 +184,7 @@ def _get_path_edges(cbn: CausalBayesianNetwork, path: Sequence[str]) -> List[Tup return structure -def is_active_path(cbn: CausalBayesianNetwork, path: Sequence[str], observed: Set[str] = None) -> bool: +def is_active_path(cbn: CausalBayesianNetwork, path: Sequence[str], observed: Optional[Set[str]] = None) -> bool: """ Check if a specifc path remains active given the 'observed' set of variables. """ @@ -213,7 +213,7 @@ def is_active_path(cbn: CausalBayesianNetwork, path: Sequence[str], observed: Se def is_active_indirect_frontdoor_trail( - cbn: CausalBayesianNetwork, start_node: str, end_node: str, observed: Set[str] = None + cbn: CausalBayesianNetwork, start_node: str, end_node: str, observed: Optional[Set[str]] = None ) -> bool: """ checks whether an active indirect frontdoor path exists given the 'observed' set of variables. @@ -240,7 +240,7 @@ def is_active_indirect_frontdoor_trail( def is_active_backdoor_trail( - cbn: CausalBayesianNetwork, start_node: str, end_node: str, observed: Set[str] = None + cbn: CausalBayesianNetwork, start_node: str, end_node: str, observed: Optional[Set[str]] = None ) -> bool: """ Returns true if there is a backdoor path that's active given the 'observed' set of nodes. diff --git a/pycid/core/macid_base.py b/pycid/core/macid_base.py index 4d372a8..9c9fc70 100644 --- a/pycid/core/macid_base.py +++ b/pycid/core/macid_base.py @@ -67,9 +67,9 @@ def to_tabular_cpd(self, variable: str, relationship: Union[Relationship, Sequen def __init__( self, - edges: Iterable[Tuple[str, str]] = None, - agent_decisions: Mapping[AgentLabel, List[str]] = None, - agent_utilities: Mapping[AgentLabel, List[str]] = None, + edges: Optional[Iterable[Tuple[str, str]]] = None, + agent_decisions: Optional[Mapping[AgentLabel, List[str]]] = None, + agent_utilities: Optional[Mapping[AgentLabel, List[str]]] = None, **kwargs: Any, ): """Initialize a new MACIDBase instance. @@ -142,7 +142,7 @@ def add_cpds(self, *cpds: TabularCPD, **relationships: Union[Relationship, List[ super().add_cpds(*cpds, **relationships) def query( - self, query: Iterable[str], context: Dict[str, Outcome], intervention: Dict[str, Outcome] = None + self, query: Iterable[str], context: Dict[str, Outcome], intervention: Optional[Dict[str, Outcome]] = None ) -> BeliefPropagation: """Return P(query|context, do(intervention))*P(context | do(intervention)). @@ -184,7 +184,7 @@ def query( return super().query(query, context, intervention) def expected_utility( - self, context: Dict[str, Outcome], intervention: Dict[str, Outcome] = None, agent: AgentLabel = 0 + self, context: Dict[str, Outcome], intervention: Optional[Dict[str, Outcome]] = None, agent: AgentLabel = 0 ) -> float: """Compute the expected utility of an agent for a given context and optional intervention diff --git a/pycid/core/mechanised_graph.py b/pycid/core/mechanised_graph.py new file mode 100644 index 0000000..163131b --- /dev/null +++ b/pycid/core/mechanised_graph.py @@ -0,0 +1,84 @@ +import itertools +from typing import Optional + +import matplotlib.pyplot as plt +import networkx as nx + +from pycid.core.macid_base import AgentLabel, MACIDBase + + +class MechanisedGraph: + def __init__(self, cid: MACIDBase): + super().__init__() + self.graph = nx.DiGraph() + self.agent_decision_mechanisms = { + agent: [decision + "_mec" for decision in decisions] for agent, decisions in cid.agent_decisions.items() + } + + # add agents + + # initialize the graph # + self.graph.add_nodes_from(cid.nodes) + self.graph.add_edges_from(cid.edges) + for node in cid.nodes: + self.graph.add_node(node + "_mec") + self.graph.add_edge(node + "_mec", node) + self.mechanism_nodes = [node + "_mec" for node in cid.nodes] + + # add edges for r-reachable mechanisms # + # combinations of decisions and all nodes + for (decision, node) in itertools.product(cid.decisions, cid.nodes): + if decision == node: + continue + if not cid.is_r_reachable(decision, node): + continue + self.graph.add_edge(node + "_mec", decision + "_mec") + + def is_sufficient_recall(self, agent: Optional[AgentLabel] = None) -> bool: + """ + Check if the specified agent or all agents have sufficient recall. + Sufficient recall is defined as the mechanized graph restricted to mechanisms + for decision variables belonging to the agent being acyclic. + + Parameters + ---------- + agent : Optional[AgentLabel] + The agent to check for sufficient recall. If not specified, all agents are checked. + + Returns + ------- + bool + True if the specified agent or all agents have sufficient recall, False otherwise. + """ + + if agent: + return self._is_sufficient_recall_single(agent) + return all(self._is_sufficient_recall_single(agent) for agent in self.agent_decision_mechanisms.keys()) + + def _is_sufficient_recall_single(self, agent: AgentLabel) -> bool: + """ + Calculates sufficient recall for a single agent. + """ + + # define a subgraph of the mechanised graph that only contains mechanisms for decision variables + decision_mechanisms = self.agent_decision_mechanisms[agent] + # restrict graph to just decision mechanisms + decision_mechanised_graph = self.graph.subgraph(decision_mechanisms) + is_dag = nx.is_directed_acyclic_graph(decision_mechanised_graph) # type: bool + return is_dag + + def is_sufficient_information(self) -> bool: + """ + this checks whether the mechanised graph restricted to just mechanisms for decision variables is acyclic + """ + # define a subgraph of the mechanised graph that only contains mechanisms for decision variables + decision_mechanisms = list(itertools.chain(*self.agent_decision_mechanisms.values())) + # TODO: Fix problem with inheritance + decision_mechanised_graph = self.graph.subgraph(decision_mechanisms) + is_dag = nx.is_directed_acyclic_graph(decision_mechanised_graph) # type: bool + return is_dag + + def draw(self) -> None: + """Draws full mechanised graph""" + nx.draw(self.graph, with_labels=True) + plt.show() diff --git a/pycid/core/relevance_graph.py b/pycid/core/relevance_graph.py index c3d1a33..c5b95c6 100644 --- a/pycid/core/relevance_graph.py +++ b/pycid/core/relevance_graph.py @@ -1,7 +1,7 @@ from __future__ import annotations import itertools -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Sequence +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Sequence import matplotlib.cm as cm import matplotlib.pyplot as plt @@ -21,7 +21,7 @@ class RelevanceGraph(nx.DiGraph): - an edge D -> D' exists iff D' is r-reachable from D (ie D strategically or probabilistically relies on D') """ - def __init__(self, cid: MACIDBase, decisions: Iterable[str] = None): + def __init__(self, cid: MACIDBase, decisions: Optional[Iterable[str]] = None): super().__init__() if decisions is None: decisions = cid.decisions diff --git a/pycid/examples/story_macids.py b/pycid/examples/story_macids.py index 22f8c17..7f65def 100644 --- a/pycid/examples/story_macids.py +++ b/pycid/examples/story_macids.py @@ -259,7 +259,7 @@ def robot_warehouse() -> MACID: B=lambda D1: noisy_copy(D1, probability=0.3, domain=[0, 1]), R=lambda B, D2: int(not B or D2), O=lambda D2: noisy_copy(D2, probability=0.6, domain=[0, 1]), - U1=lambda Q, B, O: int(Q and not O) - int(B), + U1=lambda Q, B, O: int(Q and not O) - int(B), # noqa: E741 U2=lambda R: R, ) return macid diff --git a/pycid/export/gambit.py b/pycid/export/gambit.py index a673dfa..ed81d65 100644 --- a/pycid/export/gambit.py +++ b/pycid/export/gambit.py @@ -177,7 +177,7 @@ def behavior_to_cpd( macid: MACIDBase, parents_to_infoset: Mapping[Tuple[Hashable, Tuple[Tuple[str, Any], ...]], pygambit.Infoset], behavior: pygambit.lib.libgambit.MixedStrategyProfile, - decisions_in_sg: Union[KeysView[str], Set[str]] = None, + decisions_in_sg: Optional[Union[KeysView[str], Set[str]]] = None, ) -> List[StochasticFunctionCPD]: """Convert a pygambit behavior strategy to list of CPDs for each decision node. Args: diff --git a/pycid/random/random_cpd.py b/pycid/random/random_cpd.py index 6bc3722..c270836 100644 --- a/pycid/random/random_cpd.py +++ b/pycid/random/random_cpd.py @@ -1,7 +1,7 @@ from __future__ import annotations import contextlib -from typing import Iterator, Mapping, Sequence +from typing import Iterator, Mapping, Optional, Sequence import numpy as np @@ -23,7 +23,9 @@ class RandomCPD: Sample a random CPD, with outcomes in the given domain """ - def __init__(self, domain: Sequence[Outcome] = None, smoothness: float = 1.0, seed: int = None) -> None: + def __init__( + self, domain: Optional[Sequence[Outcome]] = None, smoothness: float = 1.0, seed: Optional[int] = None + ) -> None: """ Parameters ---------- diff --git a/tests/test_mechanised.py b/tests/test_mechanised.py new file mode 100644 index 0000000..a3b6b51 --- /dev/null +++ b/tests/test_mechanised.py @@ -0,0 +1,50 @@ +import sys + +import pytest + +from pycid.core.macid import MACID +from pycid.core.mechanised_graph import MechanisedGraph +from pycid.examples.story_macids import forgetful_movie_star, taxi_competition + + +@pytest.fixture +def taxi() -> MACID: + return taxi_competition() + + +@pytest.fixture +def movie_star() -> MACID: + return forgetful_movie_star() + + +def test_create_mechanised_graph(taxi: MACID) -> None: + mech_graph = MechanisedGraph(taxi) + assert len(mech_graph.graph.nodes) == (2 * len(taxi.nodes)) + # Assert edges are correct in terms of r-relevance: There should be an edge from 'D2_mec' to 'D1_mec' + # because 'D2' is r-reachable from 'D1' + # print (mech_graph.edges) if fails + + +def test_create_r_reachable_mechanised_graph(taxi: MACID) -> None: + mech = MechanisedGraph(taxi) + # Assert edges are correct in terms of r-relevance: There should be an edge from 'D2_mec' to 'D1_mec' + # because 'D2' is r-reachable from 'D1' + assert ("D2_mec", "D1_mec") in mech.graph.edges + # Check Utility nodes are r-reachable from decisions + assert ("U1_mec", "D1_mec") in mech.graph.edges + assert ("U2_mec", "D2_mec") in mech.graph.edges + + +def test_sufficient_recall_single(movie_star: MACID) -> None: + mech = MechanisedGraph(movie_star) + assert mech.is_sufficient_recall(2) + assert not mech.is_sufficient_recall(1) + + +def test_sufficient_recall_all(movie_star: MACID) -> None: + mech = MechanisedGraph(movie_star) + assert not mech.is_sufficient_recall() + + +if __name__ == "__main__": + pytest.main(sys.argv)