Source code for opengnc.simulation.events

from __future__ import annotations
import heapq
from collections.abc import Callable
from typing import Any


[docs] class Event: """ A single discrete event to be processed at a specific time. """ def __init__(self, t: float, callback: Callable[..., Any], *args: Any, **kwargs: Any) -> None: """ Initialize a discrete event. Parameters ---------- t : float Simulation time at which the event should trigger. callback : Callable Function to be executed when the event triggers. """ self.t = t self.callback = callback self.args = args self.kwargs = kwargs def __lt__(self, other: Event) -> bool: return self.t < other.t
[docs] def execute(self) -> Any: """Executes the event callback.""" return self.callback(*self.args, **self.kwargs)
[docs] class EventQueue: """ Priority queue managing discrete events in the simulation. Handles maneuver scheduling and mode transitions. """ def __init__(self) -> None: self._events: list[Event] = []
[docs] def schedule(self, t: float, callback: Callable[..., Any], *args: Any, **kwargs: Any) -> None: """ Schedule a new event. Parameters ---------- t : float Time at which to run the callback. callback : Callable The function to execute. """ heapq.heappush(self._events, Event(t, callback, *args, **kwargs))
[docs] def has_events(self) -> bool: """Returns True if there are pending events.""" return len(self._events) > 0
[docs] def next_event_time(self) -> float: """Returns the time of the next pending event, or infinity if empty.""" if not self._events: return float("inf") return self._events[0].t
[docs] def process_until(self, current_time: float) -> None: """ Process all events scheduled up to the provided current_time. Parameters ---------- current_time : float The current simulation time. """ while self._events and self._events[0].t <= current_time: event = heapq.heappop(self._events) event.execute()