from collections import defaultdict
from functools import cached_property
from itertools import permutations
from typing import Self, Optional, Iterator, Iterable, Union, NamedTuple, TYPE_CHECKING
from pykappa.utils import Counted, IndexedSet
if TYPE_CHECKING:
from pykappa.mixture import Mixture
# String partner states can be: "#" (wildcard), "." (empty), "_" (bound), "?" (undetermined)
# "?" is the default in pattern instantiation and a wildcard in rules and observations
SiteType = NamedTuple("SiteType", [("site_name", str), ("agent_name", str)])
Partner = str | SiteType | int | Union["Site"]
[docs]
class Site(Counted):
"""Represents a site on an agent with state and binding partner information.
Attributes:
agent: The agent this site belongs to (set after initialization).
label: Name of the site.
state: Internal state of the site.
partner: Binding partner specification.
"""
agent: "Agent" # Expected to be set after initialization
def __init__(self, label: str, state: str, partner: Partner):
"""Initialize a site with label, state, and partner.
Args:
label: Name of the site.
state: Internal state of the site.
partner: Binding partner specification.
"""
super().__init__()
self.label = label
self.state = state
self.partner = partner
def __repr__(self):
return f'Site(id={self.id}, kappa_str="{self.kappa_str}")'
@property
def kappa_partner_str(self) -> str:
if self.partner == "?":
return ""
elif self.coupled:
return "[_]"
return f"[{self.partner}]"
@property
def kappa_state_str(self) -> str:
return "" if self.state == "?" else f"{{{self.state}}}"
@property
def kappa_str(self) -> str:
"""The site representation in Kappa format."""
return f"{self.label}{self.kappa_partner_str}{self.kappa_state_str}"
@property
def undetermined(self) -> bool:
"""Check if the site is in a state equivalent to leaving it unnamed in an agent."""
return self.state == "?" and self.partner in ("?", ".")
@property
def underspecified(self) -> bool:
"""Check if a concrete Site can be created from this pattern."""
return (
self.state == "#"
or self.partner in ("#", "_")
or isinstance(self.partner, SiteType)
)
@property
def stated(self) -> bool:
"""Check if the site has a specific internal state."""
return self.state not in ("#", "?")
@property
def bound(self) -> bool:
"""Check if the site is bound."""
return (
self.partner == "_"
or isinstance(self.partner, SiteType)
or isinstance(self.partner, Site)
)
@property
def coupled(self) -> bool:
"""Check if the site is coupled to a specific other site."""
return isinstance(self.partner, Site)
[docs]
def embeds_in(self, other: Self) -> bool:
"""Check whether self as a pattern matches other as a concrete site."""
if (self.stated and self.state != other.state) or (
self.bound and not other.coupled
):
return False
match self.partner:
case ".":
return other.partner == "."
case SiteType():
return (
self.partner.site_name == other.partner.label
and self.partner.agent_name == other.partner.agent.type
)
case Site():
return (
self.partner.agent.type == other.partner.agent.type
and self.label == other.label
)
return True
[docs]
class Agent(Counted):
"""Represents an agent with a type and collection of sites.
Attributes:
type: Type name of the agent.
interface: Dictionary mapping site labels to Site objects.
"""
[docs]
@classmethod
def from_kappa(cls, kappa_str: str) -> Self:
"""Parse a single agent from a Kappa string.
Raises:
AssertionError: If the string doesn't describe exactly one agent.
"""
from pykappa.parsing import kappa_parser, KappaTransformer
# Check pattern describes only a single agent
input_tree = kappa_parser.parse(kappa_str)
assert input_tree.data == "kappa_input"
assert len(input_tree.children) == 1
pattern_tree = input_tree.children[0]
assert pattern_tree.data == "pattern"
assert (
len(pattern_tree.children) == 1
), "Zero or more than one agent patterns were specified."
agent_tree = pattern_tree.children[0]
return KappaTransformer().transform(agent_tree)
def __init__(self, type: str, sites: Iterable[Site]):
"""Initialize an agent with given type and sites."""
super().__init__()
self.type = type
self.interface = {site.label: site for site in sites}
def __iter__(self):
yield from self.sites
def __getitem__(self, key: str) -> Site:
"""Get a site by its label."""
return self.interface[key]
def __repr__(self):
return f'Agent(id={self.id}, kappa_str="{self.kappa_str}")'
@property
def kappa_str(self):
"""The agent representation in Kappa format."""
return f"{self.type}({" ".join(site.kappa_str for site in self)})"
@property
def sites(self) -> Iterable[Site]:
"""All sites of this agent."""
yield from self.interface.values()
@cached_property
def underspecified(self) -> bool:
"""Check if a concrete Agent can be created from this pattern."""
return any(site.underspecified for site in self)
@property
def neighbors(self) -> list[Self]:
"""The agents directly connected to this one."""
return [site.partner.agent for site in self if site.coupled]
@property
def depth_first_traversal(self) -> list[Self]:
"""Perform depth-first traversal starting from this agent."""
visited = set()
traversal = []
stack = [self]
while stack:
if (agent := stack.pop()) not in visited:
visited.add(agent)
traversal.append(agent)
stack.extend(agent.neighbors)
return traversal
@property
def instantiable(self) -> bool:
"""Check if this agent pattern can be instantiated."""
return not any(site.underspecified for site in self)
[docs]
def detached(self) -> Self:
"""Create a clone with all sites emptied of partners."""
detached = type(self)(
self.type, [Site(site.label, site.state, ".") for site in self]
)
for site in detached:
site.agent = detached
return detached
[docs]
def isomorphic(self, other: Self) -> bool:
"""Check if two Agents are equivalent locally, ignoring partners.
Note:
Doesn't assume agents of the same type will have the same site signatures.
"""
if self.type != other.type:
return False
b_sites_leftover = set(other.interface)
for site_name, a_site in self.interface.items():
# Check that `b` has a site with the same name and state
if site_name in other.interface:
b_sites_leftover.remove(site_name)
if a_site.state != other[site_name].state:
return False
else:
if not a_site.undetermined:
return False
# Check that sites in `other` not mentioned in `self`are undetermined
return all(other[site_name].undetermined for site_name in b_sites_leftover)
[docs]
def embeds_in(self, other: Self) -> bool:
"""Check whether self as a pattern matches other as a concrete agent."""
if self.type != other.type:
return False
for a_site in self:
if a_site.label not in other.interface and not a_site.undetermined:
return False
b_site = other[a_site.label]
if not a_site.embeds_in(b_site):
return False
return True
[docs]
class Embedding(dict[Agent, Agent]):
"""Dictionary representing a mapping from pattern agents to mixture agents."""
def __hash__(self):
return hash(frozenset(self.items()))
def __repr__(self):
return f"Embedding({', '.join(f"{a.id}: {self[a].id}" for a in self)})"
[docs]
class Component(Counted):
"""A set of agents that are all in the same connected component.
Note:
Connectedness is not guaranteed statically and must be enforced.
"""
agents: IndexedSet[Agent]
n_copies: int
[docs]
@classmethod
def from_kappa(cls, kappa_str: str) -> Self:
"""Parse a single component from a Kappa string.
Raises:
AssertionError: If the pattern doesn't represent exactly one component.
"""
parsed_pattern = Pattern.from_kappa(kappa_str)
assert len(parsed_pattern.components) == 1
return parsed_pattern.components[0]
def __init__(self, agents: Iterable[Agent], n_copies: int = 1):
"""Initialize a component with agents.
Raises:
AssertionError: If agents list is empty or n_copies < 1.
NotImplementedError: If n_copies != 1 (not yet supported).
"""
super().__init__()
assert agents
assert n_copies >= 1
if n_copies != 1:
raise NotImplementedError(
"Simulations won't handle n_copies correctly in counting embeddings."
)
self.agents = IndexedSet(agents) # TODO: order by graph traversal
self.agents.create_index("type", lambda a: [a.type])
self.n_copies = n_copies
def __iter__(self):
yield from self.agents
def __len__(self):
return len(self.agents)
def __repr__(self):
return f'Component(id={self.id}, kappa_str="{self.kappa_str}")'
@property
def kappa_str(self) -> str:
"""The component representation in Kappa format."""
return Pattern.agents_to_kappa_str(self.agents)
[docs]
def add(self, agent: Agent):
"""Add an agent to this component."""
self.agents.add(agent)
[docs]
def isomorphic(self, other: Self) -> bool:
"""Check if two components are isomorphic."""
return next(self.isomorphisms(other), None) is not None
[docs]
def embeddings(
self, other: Self | "Mixture" | Iterable[Agent], exact: bool = False
) -> Iterator[Embedding]:
"""Find embeddings of self in other.
Args:
other: Target to find embeddings in.
exact: If True, finds isomorphisms instead of embeddings.
"""
if hasattr(other, "agents"):
other: IndexedSet[Agent] = other.agents
assert "type" in other.properties
a_root = next(iter(self.agents)) # "a" refers to `self` and "b" to `other`
# Narrow the search by mapping `a_root` to agents in `other` of the same type
for b_root in other.lookup("type", a_root.type):
agent_map = Embedding({a_root: b_root}) # The potential bijection
frontier = {a_root}
root_failed = False
while frontier and not root_failed:
a = frontier.pop()
b = agent_map[a]
match_func = a.isomorphic if exact else a.embeds_in
if not match_func(b):
root_failed = True
break
for a_site in a:
if a_site.label not in b.interface:
if not a_site.undetermined:
root_failed = True
break
else:
continue
b_site = b[a_site.label]
if a_site.coupled:
if not b_site.coupled:
root_failed = True
break
a_partner = a_site.partner.agent
b_partner = b_site.partner.agent
if b_partner not in other:
# The embedding must be enclosed in the given set of agents
root_failed = True
break
elif a_partner not in agent_map:
frontier.add(a_partner)
agent_map[a_partner] = b_partner
elif agent_map[a_site.partner.agent] != b_site.partner.agent:
root_failed = True
break
elif exact and a_site.partner != b_site.partner:
root_failed = True
break
if not root_failed:
yield agent_map # A valid bijection
[docs]
def isomorphisms(self, other: Self | "Mixture") -> Iterator[dict[Agent, Agent]]:
"""Find bijections which respect links in the site graph.
Checks for bijections ensuring that any internal site state specified
in one component exists and is the same in the other.
Note:
Handles isomorphism generally, between instantiated components
in a mixture and potentially between rule patterns.
"""
if len(self.agents) != len(other.agents):
return
yield from self.embeddings(other, exact=True)
@cached_property
def n_automorphisms(self) -> int:
"""Returns the number of automorphisms (i.e. isomorphisms onto itself)
of the component.
Note:
This uses a cached result, and thus should only be used
for static components, i.e. the ones in rules and observables.
Do not use this for components in a mixture that can change.
"""
return len(list(self.isomorphisms(self)))
@property
def diameter(self) -> int:
"""Get the maximum minimum shortest path between any two agents."""
def bfs_depth(root) -> int:
frontier = set([root])
seen = set()
depth = -1
while frontier:
depth += 1
new_frontier = set()
seen = seen | frontier
for cur in frontier:
for n in cur.neighbors:
if n not in seen:
new_frontier.add(n)
frontier = new_frontier
return depth
return max(bfs_depth(a) for a in self.agents)
[docs]
def plot(self, legend: bool = True) -> None:
"""Plot the component using graphviz."""
import colorsys
from graphviz import Source
agent_types = list(dict.fromkeys(a.type for a in self.agents))
type_color = {
t: "#{:02x}{:02x}{:02x}".format(
*[
int(c * 255)
for c in colorsys.hls_to_rgb(i / max(len(agent_types), 1), 0.4, 0.8)
]
)
for i, t in enumerate(agent_types)
}
edges = set()
for a in self.agents:
for b in a.neighbors:
if a is b:
continue
edges.add(tuple(sorted((id(a), id(b)))))
lines = [
"graph {",
" graph [overlap=false, splines=false, nodesep=0.05, ranksep=0.05];",
" layout=twopi;", # radial layout
' node [shape=circle, width=0.05, height=0.05, fixedsize=true, label="", style=filled];',
" edge [penwidth=0.3];",
]
if legend:
lines += [
" subgraph cluster_legend {",
' label="Legend"; style=filled; fillcolor=white; fontsize=10;',
" node [shape=circle, width=0.15, height=0.15, fixedsize=true, style=filled, fontsize=8, labelloc=r];",
]
for t, color in type_color.items():
lines.append(f' legend_{t} [label="{t}", fillcolor="{color}"];')
lines.append(" }")
for a in self.agents:
color = type_color[a.type]
lines.append(f' a{id(a)} [fillcolor="{color}"];')
for u, v in edges:
lines.append(f" a{u} -- a{v};")
lines.append("}")
return Source("\n".join(lines), engine="twopi")
[docs]
class Pattern:
"""A pattern consisting of multiple agents, some of which may be None (empty slots).
Attributes:
agents: List of agents, where None represents empty slots in rules.
"""
agents: list[Optional[Agent]]
[docs]
@classmethod
def from_kappa(cls, kappa_str: str) -> Self:
"""Parse a pattern from a Kappa string.
Args:
kappa_str: Kappa string describing a pattern.
Returns:
Parsed Pattern object.
Raises:
AssertionError: If the string doesn't describe exactly one pattern.
"""
from pykappa.parsing import kappa_parser, KappaTransformer
input_tree = kappa_parser.parse(kappa_str)
assert input_tree.data == "kappa_input"
assert (
len(input_tree.children) == 1
), "Zero or more than one patterns were specified."
assert len(input_tree.children) == 1
pattern_tree = input_tree.children[0]
return KappaTransformer().transform(pattern_tree)
def __init__(self, agents: list[Optional[Agent]]):
"""Compile a pattern from a list of Agents.
Replaces integer link states with references to actual partners, and
constructs helper objects for tracking connected components. A None
in agents represents an empty slot in a rule expression pattern.
Args:
agents: List of agents, where None represents empty slots.
Raises:
AssertionError: If integer links are malformed.
"""
self.agents = agents
# Parse site connections implied by integer LinkStates
integer_links: defaultdict[int, list[Site]] = defaultdict(list)
for agent in agents:
if agent is not None:
for site in agent:
if isinstance(site.partner, int):
integer_links[site.partner].append(site)
# Replace integer LinkStates with Agent references
for i in integer_links:
linked_sites = integer_links[i]
if len(linked_sites) == 1:
raise AssertionError(f"Site link {i} is only referenced in one site.")
elif len(linked_sites) > 2:
raise AssertionError(
f"Site link {i} is referenced in more than two sites."
)
else:
linked_sites[0].partner = linked_sites[1]
linked_sites[1].partner = linked_sites[0]
def __iter__(self) -> Iterator[Optional[Agent]]:
yield from self.agents
def __len__(self):
return len(self.agents)
@cached_property
def components(self) -> list[Component]:
"""The connected components in this pattern."""
unseen = set(agent for agent in self.agents if agent is not None)
components = []
while unseen:
component = Component(next(iter(unseen)).depth_first_traversal)
unseen = unseen.difference(component)
components.append(component)
return components
[docs]
@staticmethod
def agents_to_kappa_str(agents: Iterable[Optional[Agent]]) -> str:
"""Convert a collection of agents to Kappa string representation."""
bond_num_counter = 1
bond_nums: dict[Site, int] = dict()
agent_strs = []
for agent in agents:
if agent is None:
agent_strs.append(".")
continue
site_strs = []
for site in agent:
if site in bond_nums:
partner_str = f"[{bond_nums[site]}]"
elif site.coupled:
partner_str = f"[{bond_num_counter}]"
bond_nums[site.partner] = bond_num_counter
bond_num_counter += 1
else:
partner_str = "" if site.partner == "?" else f"[{site.partner}]"
site_strs.append(f"{site.label}{partner_str}{site.kappa_state_str}")
agent_strs.append(f"{agent.type}({" ".join(site_strs)})")
return ", ".join(agent_strs)
@property
def kappa_str(self) -> str:
"""The pattern representation in Kappa format."""
return type(self).agents_to_kappa_str(self.agents)
@cached_property
def underspecified(self) -> bool:
"""Check if any agents in the pattern are underspecified."""
return any(agent is None or agent.underspecified for agent in self.agents)
[docs]
def n_isomorphisms(self, other: Self) -> int:
"""Counts the number of bijections which respect links in the site graph.
Note:
Runtime is exponential in the number of components; use with caution.
"""
if len(self.components) != len(other.components):
return 0
res = 0
for perm in permutations(other.components):
temp = 1
for l, r in zip(self.components, perm):
temp *= len(list(l.isomorphisms(r)))
res += temp
return res