Source code for opengnc.kalman_filters.akf

"""
Adaptive Kalman Filter (AKF) with online covariance estimation (Myers-Tapley).
"""

import numpy as np
from typing import cast

from opengnc.kalman_filters.kf import KF


[docs] class AKF(KF): """ Adaptive Kalman Filter (AKF) using Myers-Tapley online covariance estimation. Estimates process noise covariance (Q) and measurement noise covariance (R) online using the innovation sequence within a moving window. Parameters ---------- dim_x : int Dimension of the state vector. dim_z : int Dimension of the measurement vector. window_size : int, optional Moving window size (N) for covariance estimation. Default is 20. """ def __init__(self, dim_x: int, dim_z: int, window_size: int = 20) -> None: super().__init__(dim_x, dim_z) self.window_size = window_size self.innovations: list[np.ndarray] = [] self.h_list: list[np.ndarray] = [] self.p_minus_list: list[np.ndarray] = [] self.fpf_t_list: list[np.ndarray] = [] self.dx_list: list[np.ndarray] = [] self.p_plus_list: list[np.ndarray] = []
[docs] def predict( self, u: np.ndarray | None = None, f_mat: np.ndarray | None = None, q_mat: np.ndarray | None = None, b_mat: np.ndarray | None = None, ) -> None: """ Predict step (stores history for adaptation). Parameters ---------- u : np.ndarray, optional Control input vector. f_mat : np.ndarray, optional State transition matrix. q_mat : np.ndarray, optional Process noise covariance. b_mat : np.ndarray, optional Control input matrix. """ f_curr = f_mat if f_mat is not None else self.F # Store F * P_{k-1|k-1} * F^T for Q estimation self.fpf_t_list.append(f_curr @ self.P @ f_curr.T) super().predict(u, f_mat, q_mat, b_mat) # Store P_{k|k-1} for R estimation self.p_minus_list.append(self.P.copy()) if len(self.p_minus_list) > self.window_size: self.p_minus_list.pop(0) self.fpf_t_list.pop(0)
[docs] def update( self, z: np.ndarray, h_mat: np.ndarray | None = None, r_mat: np.ndarray | None = None ) -> None: """ Update step with online R and Q adaptation. Parameters ---------- z : np.ndarray Measurement vector. h_mat : np.ndarray, optional Measurement matrix. r_mat : np.ndarray, optional Measurement noise covariance. """ h_curr = h_mat if h_mat is not None else self.H x_minus = self.x.copy() innov = z - (h_curr @ self.x) super().update(z, h_mat, r_mat) dx = self.x - x_minus self.innovations.append(innov) self.h_list.append(h_curr) self.dx_list.append(dx) self.p_plus_list.append(self.P.copy()) if len(self.innovations) > self.window_size: self.innovations.pop(0) self.h_list.pop(0) self.dx_list.pop(0) self.p_plus_list.pop(0) # Perform adaptation if we have enough samples if len(self.innovations) >= self.window_size: self._adapt_noise_covariances()
def _adapt_noise_covariances(self) -> None: """Estimates Q and R based on the history of innovations and state corrections.""" n_val = self.window_size sum_yyt = np.zeros((self.dim_z, self.dim_z)) sum_hpht = np.zeros((self.dim_z, self.dim_z)) sum_q = np.zeros((self.dim_x, self.dim_x)) for i in range(n_val): y_innov = self.innovations[i] h_mat = self.h_list[i] p_minus = self.p_minus_list[i] dx_corr = self.dx_list[i] p_plus = self.p_plus_list[i] fpf_t = self.fpf_t_list[i] sum_yyt += np.outer(y_innov, y_innov) sum_hpht += h_mat @ p_minus @ h_mat.T # Myers-Tapley formula for Q sum_q += np.outer(dx_corr, dx_corr) + p_plus - fpf_t r_hat = (1.0 / n_val) * sum_yyt - (1.0 / n_val) * sum_hpht self.R = self._make_psd(r_hat) q_hat = (1.0 / n_val) * sum_q self.Q = self._make_psd(q_hat) def _make_psd(self, matrix: np.ndarray) -> np.ndarray: """ Force a matrix to be positive semi-definite by clipping negative eigenvalues. Parameters ---------- matrix : np.ndarray Input symmetric matrix. Returns ------- np.ndarray PSD matrix. """ eigenvalues, eigenvectors = np.linalg.eigh(matrix) eigenvalues = np.maximum(eigenvalues, 1e-6) return cast(np.ndarray, eigenvectors @ np.diag(eigenvalues) @ eigenvectors.T)