import os
import shutil
import tempfile
import random
import warnings
from collections import defaultdict
from functools import cached_property
from typing import Callable, Optional, Iterable, Self
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.figure
from pykappa.mixture import Mixture
from pykappa.rule import Rule, KappaRule, KappaRuleUnimolecular, KappaRuleBimolecular
from pykappa.pattern import Component, Pattern
from pykappa.expression import Expression
from pykappa.utils import str_table, equilibrated as _equilibrated
[docs]
class System:
"""A Kappa system containing agents, rules, observables, and variables for simulation.
Attributes:
mixture: The current state of agents and their connections.
rules: Dictionary mapping rule names to Rule objects.
observables: Dictionary mapping observable names to expressions.
variables: Dictionary mapping variable names to expressions.
monitor: Optional Monitor object for tracking simulation history.
time: Current simulation time.
tallies: Dictionary tracking rule application counts.
rng: Random number generator for reproducibility of updates.
"""
mixture: Mixture
rules: dict[str, Rule]
observables: dict[str, Expression]
variables: dict[str, Expression]
monitor: Optional["Monitor"]
time: float
tallies: defaultdict[str, dict[str, int]]
rng: random.Random
[docs]
@classmethod
def read_ka(cls, filepath: str, seed: Optional[int] = None) -> Self:
"""Read and parse a Kappa .ka file to create a System.
Args:
filepath: Path to the Kappa file.
seed: Random seed for reproducibility.
"""
with open(filepath) as f:
return cls.from_ka(f.read(), seed=seed)
[docs]
@classmethod
def from_ka(cls, ka_str: str, seed: Optional[int] = None) -> Self:
"""Create a System from a Kappa (.ka style) string.
Args:
ka_str: Kappa language string containing a system definition.
seed: Random seed for reproducibility.
"""
from pykappa.parsing import (
kappa_parser,
KappaTransformer,
ExpressionTransformer,
)
input_tree = kappa_parser.parse(ka_str)
assert input_tree.data == "kappa_input"
variables: dict[str, Expression] = {}
observables: dict[str, Expression] = {}
rules: list[Rule] = []
system_params: dict[str, int] = {}
inits: list[tuple[Expression, Pattern]] = []
for child in input_tree.children:
tag = child.data
if tag in ["f_rule", "fr_rule", "ambi_rule", "ambi_fr_rule"]:
new_rules = KappaTransformer().transform(child)
rules.extend(new_rules)
elif tag == "variable_declaration":
name_tree = child.children[0]
assert name_tree.data == "declared_variable_name"
name = name_tree.children[0].value.strip("'\"")
expr_tree = child.children[1]
assert expr_tree.data == "algebraic_expression"
value = ExpressionTransformer.from_tree(expr_tree)
variables[name] = value
elif tag == "plot_declaration":
raise NotImplementedError
elif tag == "observable_declaration":
label_tree = child.children[0]
assert isinstance(label_tree, str)
name = label_tree.strip("'\"")
expr_tree = child.children[1]
assert expr_tree.data == "algebraic_expression"
value = ExpressionTransformer.from_tree(expr_tree)
observables[name] = value
elif tag == "signature_declaration":
raise NotImplementedError
elif tag == "init_declaration":
expr_tree = child.children[0]
assert expr_tree.data == "algebraic_expression"
amount = ExpressionTransformer.from_tree(expr_tree)
pattern_tree = child.children[1]
if pattern_tree.data == "declared_token_name":
raise NotImplementedError
assert pattern_tree.data == "pattern"
pattern = KappaTransformer().transform(pattern_tree)
inits.append((amount, pattern))
elif tag == "declared_token":
raise NotImplementedError
elif tag == "definition":
reserved_name_tree = child.children[0]
assert reserved_name_tree.data == "reserved_name"
name = reserved_name_tree.children[0].value.strip("'\"")
value_tree = child.children[1]
assert value_tree.data == "value"
value = int(value_tree.children[0].value)
system_params[name] = value
elif tag == "pattern":
raise NotImplementedError
else:
raise TypeError(f"Unsupported input type: {tag}")
system = cls(None, rules, observables, variables, seed=seed)
for init in inits:
system.mixture.instantiate(init[1], int(init[0].evaluate(system)))
return system
[docs]
@classmethod
def from_kappa(
cls,
mixture: Optional[dict[str, int]] = None,
rules: Optional[Iterable[str]] = None,
observables: Optional[list[str] | dict[str, str]] = None,
variables: Optional[dict[str, str]] = None,
*args,
**kwargs,
) -> Self:
"""Create a System from Kappa strings.
Args:
mixture: Dictionary mapping agent patterns to initial counts.
rules: Iterable of rule strings in Kappa format.
observables: List of observable expressions or dict mapping names to expressions.
variables: Dictionary mapping variable names to expressions.
*args: Additional arguments passed to System constructor.
**kwargs: Additional keyword arguments passed to System constructor.
"""
real_rules = []
if rules is not None:
for rule in rules:
real_rules.extend(KappaRule.list_from_kappa(rule))
if observables is None:
real_observables = {}
elif isinstance(observables, list):
real_observables = {
f"o{i}": Expression.from_kappa(obs) for i, obs in enumerate(observables)
}
else:
real_observables = {
name: Expression.from_kappa(obs) for name, obs in observables.items()
}
real_variables = (
{}
if variables is None
else {name: Expression.from_kappa(var) for name, var in variables.items()}
)
return cls(
None if mixture is None else Mixture.from_kappa(mixture),
real_rules,
real_observables,
real_variables,
*args,
**kwargs,
)
def __init__(
self,
mixture: Optional[Mixture] = None,
rules: Optional[Iterable[Rule]] = None,
observables: Optional[dict[str, Expression]] = None,
variables: Optional[dict[str, Expression]] = None,
monitor: bool = True,
seed: Optional[int] = None,
):
"""Initialize a new System.
Args:
mixture: Initial mixture state.
rules: Collection of rules to apply.
observables: Dictionary of observable expressions.
variables: Dictionary of variable expressions.
monitor: Whether to enable monitoring of simulation history.
seed: Random seed for reproducibility.
"""
self.rng = random.Random() if seed is None else random.Random(seed)
self.rules = (
{} if rules is None else {f"r{i}": rule for i, rule in enumerate(rules)}
)
mixture = Mixture() if mixture is None else mixture
if any(
type(rule) in [KappaRuleUnimolecular, KappaRuleBimolecular]
for rule in self.rules.values()
):
mixture.enable_component_tracking()
self.observables = {} if observables is None else observables
self.variables = {} if variables is None else variables
self.set_mixture(mixture)
self.time = 0
self.tallies = defaultdict(lambda: {"applied": 0, "failed": 0})
self.monitor = Monitor(self) if monitor else None
def __str__(self):
return self.kappa_str
def __getitem__(self, name: str) -> int | float:
"""Get the value of an observable or variable.
Raises:
KeyError: If name doesn't correspond to any observable or variable.
"""
if name in self.observables:
return self.observables[name].evaluate(self)
elif name in self.variables:
return self.variables[name].evaluate(self)
else:
raise KeyError(
f"Name {name} doesn't correspond to a declared observable or variable"
)
def __setitem__(self, name: str, kappa_str: str):
"""Set or update an observable or variable from a Kappa string.
Args:
name: Name to assign to the expression.
kappa_str: Kappa expression string.
"""
expr = Expression.from_kappa(kappa_str)
self._track_expression(expr)
if name in self.variables:
self.variables[name] = expr
else: # Set new expressions as observables
self.observables[name] = expr
@property
def names(self) -> dict[str, set[str]]:
"""The names of all observables and variables."""
return {
"observables": set(self.observables),
"variables": set(self.variables),
}
@property
def tallies_str(self) -> str:
"""A formatted string showing how many times each rule has been applied."""
return str_table(
[
[str(rule), tallies["applied"], tallies["failed"]]
for rule, tallies in self.tallies.items()
],
header=["Rule", "Applied", "Failed"],
)
@property
def kappa_str(self) -> str:
"""The system representation in Kappa (.ka style) format."""
kappa_str = ""
for var_name, var in self.variables.items():
kappa_str += f"%var: '{var_name}' {var.kappa_str}\n"
for rule in self.rules.values():
assert isinstance(rule, KappaRule)
kappa_str += f"{rule.kappa_str}\n"
for obs_name, obs in self.observables.items():
obs_str = (
f"|{obs.kappa_str}|" if isinstance(obs, Component) else obs.kappa_str
)
kappa_str += f"%obs: '{obs_name}' {obs_str}\n"
kappa_str += self.mixture.kappa_str
return kappa_str
[docs]
def to_ka(self, filepath: str) -> None:
"""Write system information to a Kappa file."""
with open(filepath, "w") as f:
f.write(self.kappa_str)
[docs]
def set_mixture(self, mixture: Mixture) -> None:
"""Set the system's mixture and update tracking."""
self.mixture = mixture
for rule in self.rules.values():
self._track_rule(rule)
for observable in self.observables.values():
self._track_expression(observable)
for variable in self.variables.values():
self._track_expression(variable)
[docs]
def add_rule(self, rule: Rule | str, name: Optional[str] = None) -> None:
"""Add a new rule to the system.
Args:
rule: Rule object or Kappa string representation.
name: Name to assign to the rule. If None, a default name is generated.
Raises:
AssertionError: If a rule with the given name already exists.
"""
if name is None:
i = 0
while (name := f"r{i}") in self.rules:
i += 1
assert name not in self.rules, "Rule {name} already exists in the system"
if isinstance(rule, str):
rule = KappaRule.from_kappa(rule)
if type(rule) in [KappaRuleUnimolecular, KappaRuleBimolecular]:
self.mixture.enable_component_tracking()
self._track_rule(rule)
self.rules[name] = rule
[docs]
def remove_rule(self, name: str) -> None:
"""Remove a rule by setting its rate to zero.
Raises:
AssertionError: If the rule already has zero rate.
KeyError: If no rule with the given name exists.
"""
assert self.rules[name].rate(self) > 0, "Rule {name} is already null"
try:
self.rules[name].stochastic_rate = Expression.from_kappa("0")
except KeyError as e:
e.add_note("No rule {name} exists in the system")
raise e
def _track_rule(self, rule: Rule) -> None:
"""Track components mentioned in the left hand side of a Rule."""
if isinstance(rule, KappaRule):
for component in rule.left.components:
# TODO: For efficiency check for isomorphism with already-tracked components
self.mixture.track_component(component)
def _track_expression(self, expression: Expression) -> None:
"""Track the Components in the given expression.
Note:
Doesn't track patterns nested by indirection - see the filter method.
"""
for component_expr in expression.filter("component_pattern"):
self.mixture.track_component(component_expr.attrs["value"])
@cached_property
def rule_reactivities(self) -> list[float]:
"""The reactivity of each rule in the system."""
return [rule.reactivity(self) for rule in self.rules.values()]
@property
def reactivity(self) -> float:
"""The total reactivity of the system."""
return sum(self.rule_reactivities)
[docs]
def wait(self) -> None:
"""Advance simulation time according to exponential distribution.
Raises:
RuntimeWarning: If system has no reactivity (infinite wait time).
"""
try:
self.time += self.rng.expovariate(self.reactivity)
except ZeroDivisionError:
warnings.warn(
"system has no reactivity: infinite wait time", RuntimeWarning
)
[docs]
def choose_rule(self) -> Optional[Rule]:
"""Choose a rule to apply based on reactivity weights.
Returns:
Selected rule, or None if no rules have positive reactivity.
"""
try:
return self.rng.choices(
list(self.rules.values()), weights=self.rule_reactivities
)[0]
except ValueError:
warnings.warn("system has no reactivity: no rule applied", RuntimeWarning)
return None
[docs]
def apply_rule(self, rule: Rule) -> None:
"""Apply a rule to the mixture and update tallies."""
update = rule.select(self.mixture)
if update is not None:
self.tallies[str(rule)]["applied"] += 1
self.mixture.apply_update(update)
del self.__dict__["rule_reactivities"]
else:
self.tallies[str(rule)]["failed"] += 1
def _warn_about_rule_symmetries(self) -> None:
if any(
isinstance(rule, KappaRule) and rule.n_symmetries > 1
for rule in self.rules.values()
):
warnings.warn(
"Some rules have multiple symmetries; PyKappa normalizes reactivities correspondingly. "
"Results may differ from KaSim."
)
[docs]
def update(self) -> None:
"""Perform one simulation step."""
if self.monitor is not None and not self.monitor.history["time"]:
self.monitor.update() # Record initial state
self._warn_about_rule_symmetries()
self.wait()
if (rule := self.choose_rule()) is not None:
self.apply_rule(rule)
if self.monitor is not None:
self.monitor.update()
[docs]
def update_via_kasim(self, time: float) -> None:
"""Simulate for a given amount of time using KaSim.
Note:
KaSim must be installed and in the PATH.
Some features may not be compatible between PyKappa and KaSim.
Raises:
AssertionError: If KaSim is not found in PATH.
"""
self._warn_about_rule_symmetries()
assert shutil.which(
"KaSim"
), "To update via KaSim, it must be installed and in the PATH."
with tempfile.TemporaryDirectory() as tmpdirname:
# Run KaSim on the current system
output_ka_path = os.path.join(tmpdirname, "out.ka")
output_cmd = f'%mod: alarm {time} do $SNAPSHOT "{output_ka_path}";'
input_ka_path = os.path.join(tmpdirname, "in.ka")
with open(input_ka_path, "w") as f:
f.write(f"{self.kappa_str}\n{output_cmd}")
os.system(f"KaSim {input_ka_path} -l {time} -d {tmpdirname}")
# Read the KaSim output
output_kappa_str = ""
with open(output_ka_path) as f:
for line in f:
if line.startswith("%init"):
split = line.split("/")
output_kappa_str += split[0] + split[-1]
# Apply the update
self.set_mixture(System.from_ka(output_kappa_str).mixture)
self.time += time
if self.monitor:
self.monitor.update()
[docs]
def update_until(
self,
condition: Callable[[Self], bool],
max_time: Optional[float] = None,
max_updates: Optional[int] = None,
check_interval: int = 100,
) -> bool:
"""Run simulation until a condition of the system has been met.
Args:
condition: A function of a system that returns whether the desired condition is met.
max_time: Maximum simulation time (None for no limit).
max_updates: Maximum number of updates (None for no limit).
check_interval: Number of updates between condition checks.
Returns:
True if condition was met, False if limits were reached first.
"""
start_time = self.time
n_updates = 0
while True:
for _ in range(check_interval):
if (max_time is not None and self.time - start_time >= max_time) or (
max_updates is not None and n_updates >= max_updates
):
return False
self.update()
n_updates += 1
try:
if condition(self):
return True
except Exception as e:
warnings.warn(
f"Condition function raised an exception during update_until: {e}",
RuntimeWarning,
)
[docs]
class Monitor:
"""Records the history of the values of observables in a system.
Attributes:
system: The system being monitored.
history: Dictionary mapping observable names to their value history.
"""
system: System
history: dict[str, list[Optional[float]]]
def __init__(self, system: System):
"""Initialize a monitor for the given system."""
self.system = system
self.history = {"time": []} | {obs_name: [] for obs_name in system.observables}
def __len__(self) -> int:
"""The number of records."""
return len(self.history["time"])
[docs]
def update(self) -> None:
"""Record current time and observable values."""
self.history["time"].append(self.system.time)
for obs_name in self.system.observables:
if obs_name not in self.history:
self.history[obs_name] = [None] * (len(self.history["time"]) - 1)
self.history[obs_name].append(self.system[obs_name])
[docs]
def measure(self, observable_name: str, time: Optional[float] = None):
"""Get the value of an observable at a specific time.
Raises:
AssertionError: If simulation hasn't reached the specified time.
"""
import bisect
times: list[int] = list(self.history["time"])
if time is None:
time = times[-1]
assert time <= max(times), "Simulation hasn't reached time {time}"
return self.history[observable_name][bisect.bisect_right(times, time) - 1]
@property
def dataframe(self) -> pd.DataFrame:
"""Get the history of observable values as a pandas DataFrame.
Returns:
DataFrame with time and observable columns.
"""
return pd.DataFrame(self.history)
[docs]
def tail_mean(
self,
observable_name: str,
tail_fraction: float = 0.1,
) -> float:
"""
Calculate the average value of an observable over a fraction of the tail.
Args:
observable_name: Name of the observable to measure.
tail_fraction: Fraction of the history to consider (from the end).
Raises:
AssertionError: If there are not enough measurements.
"""
window_len = int(tail_fraction * len(self))
assert (
len(self) >= window_len and window_len >= 1
), f"Not enough measurements ({len(self)}) to calculate tail mean for {observable_name}"
values = np.asarray(self.history[observable_name][-window_len:], dtype=float)
return float(np.mean(values))
[docs]
def equilibrated(
self,
observable_name: Optional[str] = None,
**equilibration_kwargs,
) -> bool:
"""
Check if an observable (or all observables) has equilibrated based on
whether the slope of recent values is sufficiently small relative to the mean.
Args:
observable_name: Name of the observable to check. If None, checks all observables.
tail_fraction: Fraction of the history to consider.
tolerance: Maximum allowed fraction slope deviation from the mean.
"""
if observable_name is None:
return all(
self.equilibrated(obs_name, **equilibration_kwargs)
for obs_name in self.system.observables
)
values = self.history[observable_name]
times = self.history["time"]
assert all(v is not None for v in values)
assert all(t is not None for t in times)
return _equilibrated(values=values, times=times, **equilibration_kwargs)
[docs]
def plot(self, combined: bool = False) -> matplotlib.figure.Figure:
"""Make a plot of all observables over time.
Args:
combined: Whether to plot all observables on the same axes.
"""
if combined:
fig, ax = plt.subplots()
for obs_name in self.system.observables:
ax.plot(self.history["time"], self.history[obs_name], label=obs_name)
plt.legend()
plt.xlabel("Time")
plt.ylabel("Observable")
plt.margins(0, 0)
else:
fig, axs = plt.subplots(
len(self.system.observables), 1, sharex=True, layout="constrained"
)
if len(self.system.observables) == 1:
axs = [axs]
for i, obs_name in enumerate(self.system.observables):
axs[i].plot(self.history["time"], self.history[obs_name], color="black")
axs[i].set_title(obs_name)
if i == len(self.system.observables) - 1:
axs[i].set_xlabel("Time")
return fig