import random
from math import prod
from typing import Optional, Self, TYPE_CHECKING
from functools import cached_property
from copy import deepcopy
from pykappa.pattern import Pattern, Component, Agent, Site
from pykappa.mixture import Mixture, _MixtureUpdate
from pykappa._expression import Expression
from pykappa._utils import rejection_sample
if TYPE_CHECKING:
from pykappa.system import System
# Useful constants
AVOGADRO = 6.02214e23
DIFFUSION_RATE = 1e9
KDS = {"weak": 1e-6, "moderate": 1e-7, "strong": 1e-8}
VOLUMES = {"fibro": 2.25e-12, "yeast": 4.2e-14}
ROOM_TEMPERATURE = 273.15 + 25
[docs]
class Rule:
"""Standard Kappa rule with left-hand side, right-hand side, and rate."""
left: Pattern
right: Pattern
stochastic_rate: Expression
[docs]
@classmethod
def list_from_kappa(cls, kappa_str: str) -> list[Self]:
"""Parse Kappa string into a list of rules.
Note:
Forward-reverse rules (with "<->") represent two rules.
"""
from pykappa._parsing import kappa_parser, KappaTransformer
input_tree = kappa_parser.parse(kappa_str)
assert input_tree.data == "kappa_input"
rule_tree = input_tree.children[0]
return KappaTransformer().transform(rule_tree)
[docs]
@classmethod
def from_kappa(cls, kappa_str: str) -> Self:
"""Parse a single Kappa rule from string.
Raises:
AssertionError: If the string represents more than one rule.
"""
rules = cls.list_from_kappa(kappa_str)
assert (
len(rules) == 1
), "The given rule expression represents more than one rule."
return rules[0]
def __init__(self, left: Pattern, right: Pattern, stochastic_rate: Expression):
self.left = left
self.right = right
self.stochastic_rate = stochastic_rate
def __post_init__(self):
l = len(self.left.agents)
r = len(self.right.agents)
assert (
l == r
), f"The left-hand side of this rule has {l} slots, but the right-hand side has {r}."
def __len__(self):
return len(self.left.agents)
def __iter__(self):
yield from zip(self.left.agents, self.right.agents)
def __repr__(self):
return f'{type(self).__name__}(kappa_str="{self.kappa_str}")'
def __str__(self):
return self.kappa_str
@property
def _rate_str(self) -> str:
return self.stochastic_rate.kappa_str
@property
def kappa_str(self) -> str:
return f"{self.left.kappa_str} -> {self.right.kappa_str} @ {self._rate_str}"
[docs]
def reactivity(self, system: "System") -> float:
"""Calculate the total reactivity of this rule in the given system,
i.e. the number of embeddings times the reaction rate, accounting
for rule symmetry.
"""
n_embeddings = self.n_embeddings(system.mixture)
n_symmetries = self.n_symmetries
assert n_embeddings % n_symmetries == 0
return n_embeddings // n_symmetries * self.rate(system)
@cached_property
def n_symmetries(self) -> int:
"""
The number of distinct automorphisms of the graph containing both left- and
right-hand side agents, augmented with edges between positionally corresponding agents.
For example, if a rule looks like "l1(...), l2(...) -> r1(...), r2(...)",
this method draws artifical edges between l1 and r1, and between l2 and r2,
then returns the number of symmetries of the resulting graph by counting
how many ways it can be mapped onto itself.
"""
left_agents = deepcopy(self.left.agents)
right_agents = deepcopy(self.right.agents)
for l, r in zip(left_agents, right_agents):
if l is not None and r is not None:
l_site = Site("__temp__", "?", partner=None)
r_site = Site("__temp__", "?", partner=None)
l_site.agent = l
l_site.partner = r_site
l_site.state = "left"
l.interface["__temp__"] = l_site
r_site.agent = r
r_site.partner = l_site
r_site.state = "right"
r.interface["__temp__"] = r_site
pattern = Pattern(left_agents + right_agents)
return pattern.n_isomorphisms(pattern)
[docs]
def rate(self, system: "System") -> float:
return self.stochastic_rate.evaluate(system)
[docs]
def n_embeddings(self, mixture: Mixture) -> int:
"""Count embeddings in the mixture.
Note:
This doesn't do any symmetry correction, though `System`
applies this correction when calculating rule reactivities.
"""
return prod(
len(mixture.embeddings(component)) for component in self.left.components
)
def _select(self, mixture: Mixture) -> Optional[_MixtureUpdate]:
"""Select agents and specify the update (or None for invalid match).
Note:
Can change the internal states of agents in the mixture but
records everything else in the MixtureUpdate.
"""
rule_embedding: dict[Agent, Agent] = {}
for component in self.left.components:
component_embeddings = mixture.embeddings(component)
assert (
len(component_embeddings) > 0
), f"A rule with no valid embeddings was selected: {self}"
component_embedding = random.choice(component_embeddings)
for rule_agent in component_embedding:
mixture_agent = component_embedding[rule_agent]
if mixture_agent in rule_embedding.values():
return None # Invalid match: two selected components intersect
else:
rule_embedding[rule_agent] = mixture_agent
return self._produce_update(rule_embedding, mixture)
def _produce_update(
self, selection_map: dict[Agent, Agent], mixture: Mixture
) -> _MixtureUpdate:
"""Produce an update specification from selected agents.
Args:
selection_map: Mapping from rule agents to mixture agents.
mixture: Current mixture state.
"""
selection = [
None if agent is None else selection_map[agent]
for agent in self.left.agents
] # Select agents in the mixture matching the rule, in order
new_selection: list[Optional[Agent]] = [None] * len(
selection
) # The new/modified agents used to make the appropriate edges
update = _MixtureUpdate()
# Manage agents
for i in range(len(self)):
l_agent = self.left.agents[i]
r_agent = self.right.agents[i]
agent: Optional[Agent] = selection[i]
match l_agent, r_agent:
case None, Agent():
new_selection[i] = update.create_agent(r_agent)
case Agent(), None:
update.remove_agent(agent)
case Agent(), Agent() if l_agent.type != r_agent.type:
update.remove_agent(agent)
new_selection[i] = update.create_agent(r_agent)
case Agent(), Agent() if l_agent.type == r_agent.type:
for r_site in r_agent:
if r_site.stated:
agent[r_site.label].state = r_site.state
if r_site.state != l_agent[r_site.label].state:
update.register_changed_agent(agent)
new_selection[i] = agent
case _:
pass
# Manage explicitly referenced edges
for i, r_agent in enumerate(self.right.agents):
if r_agent is None:
continue
agent = new_selection[i]
for r_site in r_agent:
site = agent[r_site.label]
match r_site.partner:
case Site() as r_partner:
partner_idx = self.right.agents.index(r_partner.agent)
partner = new_selection[partner_idx][r_partner.label]
update.connect_sites(site, partner)
case ".":
update.disconnect_site(site)
case x if (
x != "?"
and self.left.agents[i]
and x != self.left.agents[i][r_site.label].partner
):
raise TypeError(
f"Site partners of type {x} are unsupported for right-hand rule patterns, unless they remain unchanged from the left-hand side."
)
return update
[docs]
class UnimolecularRule(Rule):
"""Rule that acts within a single component."""
_component_weights: dict[Component, int] # Cache of embedding weights per component
def __post_init__(self):
super().__post_init__()
self._component_weights = {}
@property
def _rate_str(self) -> str:
return f"0 {{{self.stochastic_rate.kappa_str}}}"
[docs]
def n_embeddings(self, mixture: Mixture) -> int:
"""Count the total number of embeddings in the mixture."""
count = 0
self._component_weights = {}
for component in mixture.components:
weight = prod(
len(mixture.embeddings_in_component(match_component, component))
for match_component in self.left.components
)
self._component_weights[component] = weight
count += weight
return count
def _select(self, mixture: Mixture) -> Optional[_MixtureUpdate]:
"""Select agents and specify the update (or None for invalid match).
Note:
n_embeddings must be called before this method so that the
component_weights cache is up-to-date.
"""
components_ordered = list(self._component_weights)
weights = [self._component_weights[c] for c in components_ordered]
selected_component = random.choices(components_ordered, weights)[0]
selection_map: dict[Agent, Agent] = {}
for component in self.left.components:
choices = mixture.embeddings_in_component(component, selected_component)
assert (
len(choices) > 0
), f"A rule with no valid embeddings was selected: {self}"
component_selection = random.choice(choices)
for agent in component_selection:
if component_selection[agent] in selection_map.values():
return None
else:
selection_map[agent] = component_selection[agent]
return self._produce_update(selection_map, mixture)
[docs]
class BimolecularRule(Rule):
"""Rule that acts between two distinct components."""
_component_weights: dict[Component, int] # Cache of embedding weights per component
def __post_init__(self):
super().__post_init__()
self._component_weights = {}
assert (
len(self.left.components) == 2
), "Bimolecular rule patterns must consist of exactly 2 components."
@property
def _rate_str(self) -> str:
return super()._rate_str + " {0}"
[docs]
def n_embeddings(self, mixture: Mixture) -> int:
"""Count the total number of embeddings in the mixture."""
count = 0
self._component_weights = {}
for component in mixture.components:
n_match1 = len(
mixture.embeddings_in_component(self.left.components[0], component)
)
n_match2 = len(mixture.embeddings(self.left.components[1])) - len(
mixture.embeddings_in_component(self.left.components[1], component)
)
weight = n_match1 * n_match2
self._component_weights[component] = weight
count += weight
return count
def _select(self, mixture: Mixture) -> Optional[_MixtureUpdate]:
"""Select agents and specify the update (or None for invalid match).
Note:
n_embeddings must be called before this method so that the
component_weights cache is up-to-date.
"""
components_ordered = list(self._component_weights.keys())
weights = [self._component_weights[c] for c in components_ordered]
selected_component = random.choices(components_ordered, weights)[0]
match1 = random.choice(
mixture.embeddings_in_component(self.left.components[0], selected_component)
)
match2 = rejection_sample(
mixture.embeddings(self.left.components[1]),
mixture.embeddings_in_component(
self.left.components[1], selected_component
),
)
return self._produce_update(match1 | match2, mixture)