Source code for opengnc.simulation.simulator

from collections.abc import Callable
from typing import Any

from .events import EventQueue
from .logging import SimulationLogger

# Type aliases for simulation callbacks
PropagatorFunc = Callable[[float, Any, float, Any], Any]
SensorFunc = Callable[[float, Any], Any]
EstimatorFunc = Callable[[float, Any], Any]
ControllerFunc = Callable[[float, Any], Any]


[docs] class MissionSimulator: r""" High-Fidelity Mission Simulator. Loop Sequence: 1. Update Sense: $\mathbf{y} = h(t, \mathbf{x}) + \mathbf{v}$ 2. Estimate: $\hat{\mathbf{x}} = f(t, \mathbf{y})$ 3. Control: $\mathbf{u} = g(t, \hat{\mathbf{x}})$ 4. Propagate: $\dot{\mathbf{x}} = f(t, \mathbf{x}, \mathbf{u})$ Parameters ---------- propagator : PropagatorFunc Truth propagation: $(t, x, dt, u) \to x_{new}$ sensor_model : Optional[SensorFunc] Measurement model: $(t, x) \to y$ estimator : Optional[EstimatorFunc] State estimation: $(t, y) \to \hat{x}$ controller : Optional[ControllerFunc] Control law: $(t, \hat{x}) \to u$ logger : Optional[SimulationLogger] Telemetry recording. """ def __init__( self, propagator: PropagatorFunc, sensor_model: SensorFunc | None = None, estimator: EstimatorFunc | None = None, controller: ControllerFunc | None = None, logger: SimulationLogger | None = None, ) -> None: """Initialize simulator core and event management.""" self.propagator = propagator self.sensor_model = sensor_model self.estimator = estimator self.controller = controller self.logger = logger self.event_queue = EventQueue() self.time: float = 0.0 self.state: Any = None
[docs] def initialize(self, t0: float, initial_state: Any) -> None: """ Set the simulation starting epoch and state. Parameters ---------- t0 : float Start time (s). initial_state : Any Initial truth state vector or object. """ self.time = t0 self.state = initial_state
[docs] def schedule_event(self, t: float, callback: Callable[..., Any], *args: Any, **kwargs: Any) -> None: """ Register a discrete event for future execution. Parameters ---------- t : float Target execution time (s). callback : Callable Function to execute at time $t$. """ self.event_queue.schedule(t, callback, *args, **kwargs)
[docs] def step(self, dt: float) -> None: """ Execute a single fixed-step simulation frame. Sequence: 1. Process pending discrete events. 2. Generate synthetic measurements (Sense). 3. Solve for state estimate (Estimate). 4. Compute control effort (Control). 5. Log data series. 6. Advance physics (Propagate). Parameters ---------- dt : float Simulation time step (s). """ # 1. Process discrete events self.event_queue.process_until(self.time) # 2. Sense meas = self.sensor_model(self.time, self.state) if self.sensor_model else None # 3. Estimate est = self.estimator(self.time, meas) if self.estimator else None # 4. Control # Fallback to perfect knowledge if no estimator ctrl_ctx = est if est is not None else self.state u = self.controller(self.time, ctrl_ctx) if self.controller else None # 5. Record if self.logger: self.logger.log(self.time, self.state, meas, est, u) # 6. Propagate Truth self.state = self.propagator(self.time, self.state, dt, u) self.time += dt
[docs] def run(self, t_end: float, dt: float) -> Any: """ Execute the simulation loop until the termination time. Parameters ---------- t_end : float Simulation stop time (s). dt : float Fixed integration step (s). Returns ------- Any Returns logger history if available, otherwise simulation end state. """ while self.time <= t_end: self.step(dt) return self.logger.history if self.logger else self.state