Source code for pykappa.system

import os
import shutil
import tempfile
import random
import warnings
from collections import defaultdict
from functools import cached_property
from typing import Optional, Iterable, Self
from graphviz import Source

from pykappa.mixture import Mixture
from pykappa.rule import Rule, UnimolecularRule, BimolecularRule
from pykappa.pattern import Component, Pattern
from pykappa.analysis import Monitor
from pykappa._expression import Expression
from pykappa._utils import str_table


[docs] class System: """A Kappa system containing agents, rules, observables, and variables for simulation.""" mixture: Mixture #: The current state of agents and their connections rules: dict[str, Rule] #: Maps rule names to Rule objects observables: dict[str, Expression] #: Maps observable names to expressions variables: dict[str, Expression] #: Maps variable names to expressions monitor: Optional["Monitor"] #: Optionally tracks simulation history time: float #: Current simulation time tallies: defaultdict[str, dict[str, int]] #: Tracks rule applications _rng: random.Random # Random number generator for reproducibility of updates
[docs] @classmethod def read_ka(cls, filepath: str, seed: Optional[int] = None) -> Self: """Read and parse a Kappa .ka file to create a System. Args: filepath: Path to the Kappa file. seed: Random seed for reproducibility. """ with open(filepath) as f: return cls.from_ka(f.read(), seed=seed)
[docs] @classmethod def from_ka(cls, ka_str: str, seed: Optional[int] = None) -> Self: """Create a System from a Kappa (.ka style) string. Args: ka_str: Kappa language string containing a system definition. seed: Random seed for reproducibility. """ from pykappa._parsing import ( kappa_parser, KappaTransformer, ExpressionTransformer, ) input_tree = kappa_parser.parse(ka_str) assert input_tree.data == "kappa_input" variables: dict[str, Expression] = {} observables: dict[str, Expression] = {} rules: list[Rule] = [] system_params: dict[str, int] = {} inits: list[tuple[Expression, Pattern]] = [] for child in input_tree.children: tag = child.data if tag in ["f_rule", "fr_rule", "ambi_rule", "ambi_fr_rule"]: new_rules = KappaTransformer().transform(child) rules.extend(new_rules) elif tag == "variable_declaration": name_tree = child.children[0] assert name_tree.data == "declared_variable_name" name = name_tree.children[0].value.strip("'\"") expr_tree = child.children[1] assert expr_tree.data == "algebraic_expression" value = ExpressionTransformer.from_tree(expr_tree) variables[name] = value elif tag == "plot_declaration": raise NotImplementedError elif tag == "observable_declaration": label_tree = child.children[0] assert isinstance(label_tree, str) name = label_tree.strip("'\"") expr_tree = child.children[1] assert expr_tree.data == "algebraic_expression" value = ExpressionTransformer.from_tree(expr_tree) observables[name] = value elif tag == "signature_declaration": raise NotImplementedError elif tag == "init_declaration": expr_tree = child.children[0] assert expr_tree.data == "algebraic_expression" amount = ExpressionTransformer.from_tree(expr_tree) pattern_tree = child.children[1] if pattern_tree.data == "declared_token_name": raise NotImplementedError assert pattern_tree.data == "pattern" pattern = KappaTransformer().transform(pattern_tree) inits.append((amount, pattern)) elif tag == "declared_token": raise NotImplementedError elif tag == "definition": reserved_name_tree = child.children[0] assert reserved_name_tree.data == "reserved_name" name = reserved_name_tree.children[0].value.strip("'\"") value_tree = child.children[1] assert value_tree.data == "value" value = int(value_tree.children[0].value) system_params[name] = value elif tag == "pattern": raise NotImplementedError else: raise TypeError(f"Unsupported input type: {tag}") system = cls(None, rules, observables, variables, seed=seed) for init in inits: system.mixture.add(init[1], int(init[0].evaluate(system))) return system
[docs] @classmethod def from_kappa( cls, mixture: Optional[dict[str, int]] = None, rules: Optional[Iterable[str]] = None, observables: Optional[list[str] | dict[str, str]] = None, variables: Optional[dict[str, str]] = None, *args, **kwargs, ) -> Self: """Create a System from Kappa strings. Args: mixture: Dictionary mapping agent patterns to initial counts. rules: Iterable of rule strings in Kappa format. observables: List of observable expressions or dict mapping names to expressions. variables: Dictionary mapping variable names to expressions. *args: Additional arguments passed to System constructor. **kwargs: Additional keyword arguments passed to System constructor. """ real_rules = [] if rules is not None: for rule in rules: real_rules.extend(Rule.list_from_kappa(rule)) if observables is None: real_observables = {} elif isinstance(observables, list): real_observables = { f"o{i}": Expression.from_kappa(obs) for i, obs in enumerate(observables) } else: real_observables = { name: Expression.from_kappa(obs) for name, obs in observables.items() } real_variables = ( {} if variables is None else {name: Expression.from_kappa(var) for name, var in variables.items()} ) return cls( None if mixture is None else Mixture.from_kappa(mixture), real_rules, real_observables, real_variables, *args, **kwargs, )
def __init__( self, mixture: Optional[Mixture] = None, rules: Optional[Iterable[Rule]] = None, observables: Optional[dict[str, Expression]] = None, variables: Optional[dict[str, Expression]] = None, monitor: bool = True, seed: Optional[int] = None, ): """ Args: mixture: Initial mixture state. rules: Collection of rules to apply. observables: Dictionary of observable expressions. variables: Dictionary of variable expressions. monitor: Whether to enable monitoring of simulation history. seed: Random seed for reproducibility. """ self._rng = random.Random() if seed is None else random.Random(seed) self.rules = ( {} if rules is None else {f"r{i}": rule for i, rule in enumerate(rules)} ) mixture = Mixture() if mixture is None else mixture if any( type(rule) in [UnimolecularRule, BimolecularRule] for rule in self.rules.values() ): mixture.enable_component_tracking() self.observables = {} if observables is None else observables self.variables = {} if variables is None else variables self._set_mixture(mixture) self.time = 0 self.tallies = defaultdict(lambda: {"applied": 0, "failed": 0}) self.monitor = Monitor(self) if monitor else None def __str__(self): return self.kappa_str def __getitem__(self, name: str) -> int | float: """Get the value of an observable or variable. Raises: KeyError: If name doesn't correspond to any observable or variable. """ if name in self.observables: return self.observables[name].evaluate(self) elif name in self.variables: return self.variables[name].evaluate(self) else: raise KeyError( f"Name {name} doesn't correspond to a declared observable or variable" ) def __setitem__(self, name: str, kappa_str: str): """Set or update an observable or variable from a Kappa string. Args: name: Name to assign to the expression. kappa_str: Kappa expression string. """ expr = Expression.from_kappa(kappa_str) self._track_expression(expr) if name in self.variables: self.variables[name] = expr else: # Set new expressions as observables self.observables[name] = expr @property def names(self) -> dict[str, set[str]]: """The names of all observables and variables.""" return { "observables": set(self.observables), "variables": set(self.variables), } @property def tallies_str(self) -> str: """A formatted string showing how many times each rule has been applied.""" return str_table( [ [str(rule), tallies["applied"], tallies["failed"]] for rule, tallies in self.tallies.items() ], header=["Rule", "Applied", "Failed"], ) @property def _reversible_rules(self) -> list[tuple[str, str]]: """Find forward/reverse rule pairs by checking pattern symmetry.""" names = list(self.rules.keys()) pairs = [] used = set() for i, name_a in enumerate(names): if name_a in used: continue rule_a = self.rules[name_a] for name_b in names[i + 1 :]: if name_b in used: continue rule_b = self.rules[name_b] if rule_a.left.n_isomorphisms( rule_b.right ) and rule_a.right.n_isomorphisms(rule_b.left): pairs.append((name_a, name_b)) used.add(name_a) used.add(name_b) break return pairs @property def kappa_str(self) -> str: """The system representation in Kappa (.ka style) format.""" kappa_list = [] # Format reversible rules with <-> notation pairs = self._reversible_rules paired = {name for pair in pairs for name in pair} for fwd_name, rev_name in pairs: fwd = self.rules[fwd_name] rev = self.rules[rev_name] kappa_list.append( f"{fwd.left.kappa_str} <-> {fwd.right.kappa_str} " f"@ {fwd._rate_str}, {rev._rate_str}" ) # Otherwise format with -> notation for name, rule in self.rules.items(): if name not in paired: kappa_list.append(rule.kappa_str) for var_name, var in self.variables.items(): kappa_list.append(f"%var: '{var_name}' {var.kappa_str}") for obs_name, obs in self.observables.items(): obs_str = ( f"|{obs.kappa_str}|" if isinstance(obs, Component) else obs.kappa_str ) kappa_list.append(f"%obs: '{obs_name}' {obs_str}") kappa_list.append(self.mixture.kappa_str) return "\n".join(kappa_list)
[docs] def to_ka(self, filepath: str) -> None: """Write system information to a Kappa file.""" with open(filepath, "w") as f: f.write(self.kappa_str)
def _set_mixture(self, mixture: Mixture) -> None: """Set the system's mixture and update tracking.""" self.mixture = mixture for rule in self.rules.values(): self._track_rule(rule) for observable in self.observables.values(): self._track_expression(observable) for variable in self.variables.values(): self._track_expression(variable)
[docs] def add_rule(self, rule: Rule | str, name: Optional[str] = None) -> None: """Add a new rule to the system. Args: rule: Rule object or Kappa string representation. name: Name to assign to the rule. If None, a default name is generated. Raises: AssertionError: If a rule with the given name already exists. """ if name is None: i = 0 while (name := f"r{i}") in self.rules: i += 1 assert name not in self.rules, "Rule {name} already exists in the system" if isinstance(rule, str): rule = Rule.from_kappa(rule) if type(rule) in [UnimolecularRule, BimolecularRule]: self.mixture.enable_component_tracking() self._track_rule(rule) self.rules[name] = rule
[docs] def remove_rule(self, name: str) -> None: """Remove a rule by setting its rate to zero. Raises: AssertionError: If the rule already has zero rate. KeyError: If no rule with the given name exists. """ assert self.rules[name].rate(self) > 0, "Rule {name} is already null" try: self.rules[name].stochastic_rate = Expression.from_kappa("0") except KeyError as e: e.add_note("No rule {name} exists in the system") raise e
def _track_rule(self, rule: Rule) -> None: """Track components mentioned in the left hand side of a Rule.""" for component in rule.left.components: # TODO: For efficiency check for isomorphism with already-tracked components self.mixture._track_component(component) def _track_expression(self, expression: Expression) -> None: """Track the Components in the given expression. Note: Doesn't track patterns nested by indirection - see the filter method. """ for component_expr in expression.filter("component_pattern"): self.mixture._track_component(component_expr.attrs["value"]) @cached_property def rule_reactivities(self) -> list[float]: """The reactivity of each rule in the system.""" return [rule.reactivity(self) for rule in self.rules.values()] @property def reactivity(self) -> float: """The total reactivity of the system.""" return sum(self.rule_reactivities)
[docs] def wait(self) -> None: """Advance simulation time according to exponential distribution. Raises: RuntimeWarning: If system has no reactivity (infinite wait time). """ try: self.time += self._rng.expovariate(self.reactivity) except ZeroDivisionError: warnings.warn( "system has no reactivity: infinite wait time", RuntimeWarning )
def _choose_rule(self) -> Optional[Rule]: """Choose a rule to apply based on reactivity weights. Returns: Selected rule, or None if no rules have positive reactivity. """ try: return self._rng.choices( list(self.rules.values()), weights=self.rule_reactivities )[0] except ValueError: warnings.warn("system has no reactivity: no rule applied", RuntimeWarning) return None def _apply_rule(self, rule: Rule) -> None: """Apply a rule to the mixture and update tallies.""" update = rule._select(self.mixture) if update is not None: self.tallies[str(rule)]["applied"] += 1 self.mixture._apply_update(update) del self.__dict__["rule_reactivities"] else: self.tallies[str(rule)]["failed"] += 1
[docs] def update(self) -> None: """Perform one simulation step.""" if self.monitor is not None and not self.monitor.history["time"]: self.monitor.update() # Record initial state self.wait() if (rule := self._choose_rule()) is not None: self._apply_rule(rule) if self.monitor is not None: self.monitor.update()
[docs] def update_via_kasim(self, time: float) -> None: """Simulate for a given amount of time using KaSim. Note: KaSim must be installed and in the PATH. Some features are not compatible between PyKappa and KaSim. """ assert shutil.which("KaSim"), "KaSim not found in the PATH." if any(rule.n_symmetries > 1 for rule in self.rules.values()): warnings.warn( "Some rules have multiple symmetries. " "PyKappa normalizes reactivities accordingly: results may differ from KaSim." ) with tempfile.TemporaryDirectory() as tmpdirname: output_ka_path = os.path.join(tmpdirname, "out.ka") output_cmd = f'%mod: alarm {time} do $SNAPSHOT "{output_ka_path}";' input_ka_path = os.path.join(tmpdirname, "in.ka") # Run KaSim with open(input_ka_path, "w") as f: f.write(f"{self.kappa_str}\n{output_cmd}") os.system(f"KaSim {input_ka_path} -l {time} -d {tmpdirname} > /dev/null") # Read KaSim output with open(output_ka_path) as f: content = f.read() content = content.replace( ",\n", ", " ) # KaSim splits long components across lines output_kappa_str = "".join( line.split("/")[0] + line.split("/")[-1] for line in content.splitlines(keepends=True) if line.startswith("%init") ) # Apply the update self._set_mixture(System.from_ka(output_kappa_str).mixture) self.time += time if self.monitor: self.monitor.update()
[docs] def contact_map(self) -> Source: from pykappa.analysis import contact_map return contact_map(self)