Source code for opengnc.sensors.sensor

"""
Abstract base class for all sensors.
"""

from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Any, cast

import numpy as np


[docs] class Sensor(ABC): """ Abstract Base Class for all sensors. Parameters ---------- name : str, optional Sensor name. Default "Sensor". """ def __init__(self, name: str = "Sensor") -> None: """Initialize sensor base.""" self.name = name self.fault_state: str | None = None self.stuck_value: np.ndarray | float | None = None
[docs] @abstractmethod def measure(self, *args: Any, **kwargs: Any) -> Any: """ Generate a measurement. Parameters ---------- *args : Any Input state(s) or signals required by the sensor. **kwargs : Any Additional parameters. Returns ------- Any Measured value. """ pass
[docs] def apply_calibration( self, value: np.ndarray | float, misalignment: np.ndarray | None = None, scale_factor: np.ndarray | float = 1.0, bias: np.ndarray | float | None = None, ) -> np.ndarray | float: """ Apply calibration residuals: val_cal = (I + M) * S * val_true + b. Parameters ---------- value : np.ndarray | float True value to be calibrated. misalignment : np.ndarray, optional Skew/misalignment matrix (3x3 for vectors). scale_factor : np.ndarray | float, optional Scale factor error (scalar or vector). Default is 1.0. bias : np.ndarray | float, optional Constant bias vector or scalar. Returns ------- np.ndarray | float Calibrated value. """ if isinstance(value, np.ndarray): # Misalignment (M) if misalignment is not None: # Assuming misalignment is a 3x3 matrix (skew) value = (np.eye(len(value)) + misalignment) @ value # Scale Factor (S) value = scale_factor * value # Bias (b) if bias is not None: value = value + bias else: # Scalar case value = scale_factor * value if bias is not None: value += bias return cast(np.ndarray | float, value)
[docs] def apply_fogm_noise( self, current_val: np.ndarray | float, sigma: float, tau: float, dt: float ) -> np.ndarray | float: """ Apply First-Order Gauss-Markov (FOGM) noise. x[k+1] = exp(-dt/tau) * x[k] + sigma * sqrt(1 - exp(-2*dt/tau)) * w[k] Parameters ---------- current_val : np.ndarray | float Current noise state value. sigma : float Steady-state standard deviation. tau : float Correlation time constant (s). dt : float Time step (s). Returns ------- np.ndarray | float Updated noise state. """ if sigma == 0 or tau <= 0: return cast(np.ndarray | float, current_val) phi = np.exp(-dt / tau) q = sigma * np.sqrt(1 - np.exp(-2 * dt / tau)) noise = np.random.normal(0, q, size=np.shape(current_val)) return cast(np.ndarray | float, phi * current_val + noise)
[docs] def apply_faults(self, value: np.ndarray | float) -> np.ndarray | float: """ Inject faults into the measurement. Parameters ---------- value : np.ndarray | float Clean measurement value. Returns ------- np.ndarray | float Faulted measurement value. """ if self.fault_state == "stuck": return cast(np.ndarray | float, self.stuck_value if self.stuck_value is not None else value) if self.fault_state == "spike": spike = np.random.normal( 0, 100 * np.std(value) if np.std(value) > 0 else 10.0, size=np.shape(value) ) return cast(np.ndarray | float, value + spike) if self.fault_state == "noise_increase": # Add extra noise return cast( np.ndarray | float, value + np.random.normal( 0, 10.0 * np.std(value) if np.std(value) > 0 else 1.0, size=np.shape(value) ), ) return cast(np.ndarray | float, value)
[docs] def add_gaussian_noise(self, value: np.ndarray | float, std_dev: float) -> np.ndarray | float: """ Helper to add zero-mean Gaussian noise. Parameters ---------- value : np.ndarray | float Nominal value. std_dev : float Standard deviation of noise. Returns ------- np.ndarray | float Noisy value. """ if std_dev is None or std_dev == 0: return cast(np.ndarray | float, value) noise = np.random.normal(0, std_dev, size=np.shape(value)) return cast(np.ndarray | float, value + noise)