import math
import colorsys
from typing import TYPE_CHECKING, Optional
import numpy as np
import pandas as pd
from graphviz import Source
import matplotlib.pyplot as plt
import matplotlib.figure
if TYPE_CHECKING:
from pykappa.pattern import Component
from pykappa.system import System
class _ComponentPlot:
"""Stable visualization of a Component across simulation steps."""
def __init__(self, component: "Component"):
self.component = component
self._positions: dict[int, tuple[float, float]] = {}
def _compute_positions(self) -> dict[int, tuple[float, float]]:
"""Assign each agent a fixed position based on identity, computed once."""
new_agents = [a for a in self.component.agents if id(a) not in self._positions]
if new_agents:
# Place new agents on a sunflower spiral (evenly distributed, deterministic)
n = len(self._positions)
golden_angle = math.pi * (3 - math.sqrt(5))
for i, agent in enumerate(new_agents):
k = n + i
r = math.sqrt(k + 1)
angle = k * golden_angle
self._positions[id(agent)] = (r * math.cos(angle), r * math.sin(angle))
return {id(a): self._positions[id(a)] for a in self.component.agents}
def __call__(self, legend: bool = True):
agent_types = sorted(dict.fromkeys(a.type for a in self.component.agents))
type_color = {
t: "#{:02x}{:02x}{:02x}".format(
*[
int(c * 255)
for c in colorsys.hls_to_rgb(i / len(agent_types), 0.4, 0.8)
]
)
for i, t in enumerate(agent_types)
}
edges = set()
for a in self.component.agents:
for b in a.neighbors:
if a is b:
continue
edges.add(tuple(sorted((id(a), id(b)))))
pos = self._compute_positions()
lines = [
"graph {",
" graph [overlap=false];",
' node [shape=circle, width=0.05, height=0.05, fixedsize=true, label="", style=filled];',
" edge [penwidth=0.3];",
]
if legend:
min_y = min(y for x, y in pos.values())
max_x = max(x for x, y in pos.values())
lx = max_x + 2.0
legend_vertical_spacing = 0.5
for i, (t, color) in enumerate(reversed(type_color.items())):
ly = min_y + i * legend_vertical_spacing
lines.append(
f' legend_{t} [shape=box, style=filled, fillcolor="{color}", '
f'label="{t}", fontsize=8, fixedsize=false, margin="0.05,0.02", pos="{lx:.3f},{ly:.3f}!"];'
)
for a in self.component.agents:
color = type_color[a.type]
x, y = pos[id(a)]
lines.append(f' a{id(a)} [fillcolor="{color}", pos="{x:.3f},{y:.3f}!"];')
for u, v in edges:
lines.append(f" a{u} -- a{v};")
lines.append("}")
return Source("\n".join(lines), engine="neato")
[docs]
class Monitor:
"""Records the history of the values of observables in a system.
Attributes:
system: The system being monitored.
history: Dictionary mapping observable names to their value history.
"""
system: "System"
history: dict[str, list[Optional[float]]]
def __init__(self, system: "System"):
"""Initialize a monitor for the given system."""
self.system = system
self.history = {"time": []} | {obs_name: [] for obs_name in system.observables}
def __len__(self) -> int:
"""The number of records."""
return len(self.history["time"])
[docs]
def update(self) -> None:
"""Record current time and observable values."""
self.history["time"].append(self.system.time)
for obs_name in self.system.observables:
if obs_name not in self.history:
self.history[obs_name] = [None] * (len(self.history["time"]) - 1)
self.history[obs_name].append(self.system[obs_name])
[docs]
def measure(self, observable_name: str, time: Optional[float] = None):
"""Get the value of an observable at a specific time.
Raises:
AssertionError: If simulation hasn't reached the specified time.
"""
import bisect
times: list[int] = list(self.history["time"])
if time is None:
time = times[-1]
assert time <= max(times), "Simulation hasn't reached time {time}"
return self.history[observable_name][bisect.bisect_right(times, time) - 1]
@property
def dataframe(self) -> pd.DataFrame:
"""Get the history of observable values as a pandas DataFrame.
Returns:
DataFrame with time and observable columns.
"""
return pd.DataFrame(self.history)
[docs]
def tail_mean(
self,
observable_name: str,
tail_fraction: float = 0.1,
) -> float:
"""
Calculate the average value of an observable over a fraction of the tail.
Args:
observable_name: Name of the observable to measure.
tail_fraction: Fraction of the history to consider (from the end).
Raises:
AssertionError: If there are not enough measurements.
"""
window_len = int(tail_fraction * len(self))
assert (
len(self) >= window_len and window_len >= 1
), f"Not enough measurements ({len(self)}) to calculate tail mean for {observable_name}"
values = np.asarray(self.history[observable_name][-window_len:], dtype=float)
return float(np.mean(values))
[docs]
def equilibrated(
self,
observable_name: Optional[str] = None,
**equilibration_kwargs,
) -> bool:
"""
Check if an observable (or all observables) has equilibrated based on
whether the slope of recent values is sufficiently small relative to the mean.
Args:
observable_name: Name of the observable to check. If None, checks all observables.
tail_fraction: Fraction of the history to consider.
tolerance: Maximum allowed fraction slope deviation from the mean.
"""
if observable_name is None:
return all(
self.equilibrated(obs_name, **equilibration_kwargs)
for obs_name in self.system.observables
)
values = self.history[observable_name]
times = self.history["time"]
assert all(v is not None for v in values)
assert all(t is not None for t in times)
return equilibrated(values=values, times=times, **equilibration_kwargs)
[docs]
def plot(self, combined: bool = False) -> matplotlib.figure.Figure:
"""Make a plot of all observables over time.
Args:
combined: Whether to plot all observables on the same axes.
"""
if combined:
fig, ax = plt.subplots()
for obs_name in self.system.observables:
ax.plot(self.history["time"], self.history[obs_name], label=obs_name)
plt.legend()
plt.xlabel("Time")
plt.ylabel("Observable")
plt.margins(0, 0)
else:
fig, axs = plt.subplots(
len(self.system.observables), 1, sharex=True, layout="constrained"
)
if len(self.system.observables) == 1:
axs = [axs]
for i, obs_name in enumerate(self.system.observables):
axs[i].plot(self.history["time"], self.history[obs_name], color="black")
axs[i].set_title(obs_name)
if i == len(self.system.observables) - 1:
axs[i].set_xlabel("Time")
return fig
[docs]
def relative_slope(
values: list[float], times: Optional[list[float]] = None, tail_fraction: float = 0.1
) -> float:
"""
Computes the magnitude of the slope of the tail of the series.
Time can be provided to account for non-uniform sampling intervals.
Raises:
AssertionError: If there are not enough measurements to compute the slope.
"""
times = times if times is not None else list(range(len(values)))
t_tail = times[-1] - tail_fraction * (times[-1] - times[0])
tail_indices = [i for i, t in enumerate(times) if t >= t_tail]
assert (
len(tail_indices) >= 2
), f"Not enough measurements ({len(tail_indices)}) to compute slope"
tail_times = [times[i] for i in tail_indices]
tail_values = [values[i] for i in tail_indices]
slope, _ = np.polyfit(tail_times, tail_values, deg=1)
return float(slope / np.mean(tail_values))
[docs]
def equilibrated(
values: list[float],
times: Optional[list[float]] = None,
tail_fraction: float = 0.1,
tolerance: float = 0.01,
) -> bool:
"""
Checks whether the magnitude of the slope of the tail of the series relative to the mean
is sufficiently small (below tolerance). Time can be provided to account for non-uniform
sampling intervals.
"""
return abs(relative_slope(values, times, tail_fraction)) <= tolerance
[docs]
def equilibration_time(
values: list[float],
times: Optional[list[float]] = None,
min_tail_length: int = 2,
tolerance: float = 0.01,
) -> float:
"""Earliest time from which the remaining series is equilibrated."""
times = times if times is not None else list(range(len(values)))
for i in range(len(values) - min_tail_length + 1):
if abs(relative_slope(values[i:], times[i:], tail_fraction=1.0)) <= tolerance:
return times[i]
raise ValueError(f"Equilibrium not detected (tolerance={tolerance})")
[docs]
def equilibrium_value(
values: list[float],
times: Optional[list[float]] = None,
min_tail_length: int = 2,
tolerance: float = 0.01,
) -> float:
"""Mean of the series from the equilibration point onward."""
times = times if times is not None else list(range(len(values)))
eq_time = equilibration_time(values, times, min_tail_length, tolerance)
eq_index = next(i for i, t in enumerate(times) if t >= eq_time)
return float(np.mean(values[eq_index:]))