Source code for ADCS.simulate_remote

from __future__ import annotations

__all__ = ["simulate_remote"]

import time
from typing import Optional

import numpy as np

from ADCS.CONOPS.goals import Goal
from ADCS.CONOPS.goallist import GoalList
from ADCS.controller.controller import Controller
from ADCS.estimators.attitude_estimators import Attitude_Estimator
from ADCS.estimators.orbit_estimators import Orbit_Estimator
from ADCS.helpers.simresults import SimulationResults
from ADCS.orbits.orbital_state import Orbital_State
from ADCS.remote.controller_rpc import (
    ComponentLocation,
    RemoteControllerProxy,
    RemoteAttitudeEstimatorProxy,
    RemoteOrbitEstimatorProxy,
    RemoteSimulationConfig,
)
from ADCS.satellite_hardware.satellite import EstimatedSatellite, Satellite
from ADCS.simulate import simulate


def _as_float_array(values) -> np.ndarray:
    """Convert arbitrary timing history input into a finite 1-D float array.

    :param values:
        Input values from a run-history field. May be ``None`` or any array-like
        object.
    :type values:
        Any

    :return:
        Flattened array containing only finite float values.
    :rtype:
        numpy.ndarray
    """
    if values is None:
        return np.asarray([], dtype=float)
    arr = np.asarray(values, dtype=float).reshape(-1)
    if arr.size == 0:
        return arr
    return arr[np.isfinite(arr)]


def _fmt_seconds(value: float | None) -> str:
    """Format a duration in seconds and milliseconds for human-readable output.

    :param value:
        Duration in seconds.
    :type value:
        float or None

    :return:
        Formatted string, or ``"N/A"`` for missing/non-finite values.
    :rtype:
        str
    """
    if value is None or not np.isfinite(value):
        return "N/A"
    return f"{value:.6f} s ({value * 1e3:.3f} ms)"


def _fmt_ms(value: float | None) -> str:
    """Format a duration in milliseconds for compact summary output.

    :param value:
        Duration in seconds.
    :type value:
        float or None

    :return:
        Millisecond-formatted string, or ``"N/A"`` for missing/non-finite values.
    :rtype:
        str
    """
    if value is None or not np.isfinite(value):
        return "N/A"
    return f"{value * 1e3:.3f} ms"


def _safe_mean(arr: np.ndarray) -> float | None:
    """Return mean of an array or ``None`` when empty.

    :param arr:
        Input array.
    :type arr:
        numpy.ndarray

    :return:
        Arithmetic mean as float, or ``None`` if ``arr`` has no elements.
    :rtype:
        float or None
    """
    if arr.size == 0:
        return None
    return float(np.mean(arr))


def _safe_percentile(arr: np.ndarray, q: float) -> float | None:
    """Return percentile of an array or ``None`` when empty.

    :param arr:
        Input array.
    :type arr:
        numpy.ndarray

    :param q:
        Percentile in the range ``[0, 100]``.
    :type q:
        float

    :return:
        Requested percentile as float, or ``None`` if ``arr`` has no elements.
    :rtype:
        float or None
    """
    if arr.size == 0:
        return None
    return float(np.percentile(arr, q))


def _print_hil_timing_summary(
    *,
    results: SimulationResults,
    remote: RemoteSimulationConfig,
    wall_clock_s: float,
) -> None:
    """Print a concise timing breakdown for a remote simulation run.

    The summary reports wall-clock runtime and averaged/p-percentile timing for
    local environment work, dynamics propagation, RPC round-trip, and server
    compute time.

    :param results:
        Simulation results containing per-step timing histories.
    :type results:
        :class:`~ADCS.helpers.simresults.SimulationResults`

    :param remote:
        Remote simulation configuration used for endpoint context.
    :type remote:
        :class:`~ADCS.remote.controller_rpc.RemoteSimulationConfig`

    :param wall_clock_s:
        End-to-end wall-clock runtime in seconds.
    :type wall_clock_s:
        float
    """
    run = results.first()

    env_local = _as_float_array(getattr(run, "env_local_time_hist", None))
    dynamics = _as_float_array(getattr(run, "dynamics_time_hist", None))
    rpc_rtt = _as_float_array(getattr(run, "control_rpc_time_hist", None))
    rpc_server = _as_float_array(getattr(run, "control_rpc_server_time_hist", None))

    env_local_mean = _safe_mean(env_local)
    dynamics_mean = _safe_mean(dynamics)
    env_setup_mean = _safe_mean(env_local + dynamics) if (env_local.size and dynamics.size) else None

    rpc_rtt_mean = _safe_mean(rpc_rtt)
    rpc_server_mean = _safe_mean(rpc_server)
    rpc_comm_arr = np.asarray([], dtype=float)
    if rpc_rtt.size and rpc_server.size:
        n = min(rpc_rtt.size, rpc_server.size)
        rpc_comm_arr = np.maximum(rpc_rtt[:n] - rpc_server[:n], 0.0)
    elif rpc_rtt.size:
        rpc_comm_arr = rpc_rtt
    rpc_comm_mean = _safe_mean(rpc_comm_arr)

    step_count = int(len(getattr(run, "time_s", [])) if getattr(run, "time_s", None) is not None else 0)

    print("\n=== HIL Timing Summary (simulate_remote) ===")
    print(f"Endpoint: {remote.host}:{remote.port}")
    print(f"Steps: {step_count}")
    print(f"Total wall-clock run time: {_fmt_seconds(wall_clock_s)}")
    print(f"Average environmental setup (orbit calc/prop): {_fmt_seconds(env_setup_mean)}")
    print(f"Average communication uplink/downlink: {_fmt_ms(rpc_comm_mean)}")
    print(f"Average server calculation time: {_fmt_ms(rpc_server_mean)}")
    print(f"Average environment calculation time (locally): {_fmt_ms(env_local_mean)}")
    print(f"Average local dynamics propagation time: {_fmt_ms(dynamics_mean)}")
    print(f"RTT p50 / p95: {_fmt_ms(_safe_percentile(rpc_rtt, 50))} / {_fmt_ms(_safe_percentile(rpc_rtt, 95))}")
    print(
        f"Server p50 / p95: {_fmt_ms(_safe_percentile(rpc_server, 50))} / "
        f"{_fmt_ms(_safe_percentile(rpc_server, 95))}"
    )
    print("===========================================")


[docs] def simulate_remote( x: np.ndarray, satellite: Satellite, os0: Orbital_State, *, controller: Optional[Controller] = None, estimator: Optional[Attitude_Estimator] = None, orbit_estimator: Optional[Orbit_Estimator] = None, goal: Optional[Goal | GoalList] = None, dt: float = 1.0, tf: float = 500.0, est_satellite: Optional[EstimatedSatellite] = None, remote: Optional[RemoteSimulationConfig] = None, ) -> SimulationResults: r""" Run an ADCS simulation with optional remoteized controller/estimator components. This wrapper configures RPC proxies based on ``remote`` placement settings, performs connectivity preflight checks, delegates execution to :func:`ADCS.simulate.simulate`, and prints a hardware-in-the-loop timing summary after completion. :param x: Initial true satellite state vector. :type x: numpy.ndarray :param satellite: True satellite model used for dynamics and sensor generation. :type satellite: :class:`~ADCS.satellite_hardware.satellite.satellite.Satellite` :param os0: Initial orbital state at simulation start. :type os0: :class:`~ADCS.orbits.orbital_state.Orbital_State` :param controller: Local controller implementation when controller placement is local. :type controller: :class:`~ADCS.controller.controller.Controller` or None :param estimator: Local attitude estimator when estimator placement is local. :type estimator: :class:`~ADCS.estimators.attitude_estimators.attitude_estimator.Attitude_Estimator` or None :param orbit_estimator: Local orbit estimator when orbit-estimator placement is local. :type orbit_estimator: :class:`~ADCS.estimators.orbit_estimators.orbit_estimator.Orbit_Estimator` or None :param goal: Optional pointing objective (single goal or goal list). :type goal: :class:`~ADCS.CONOPS.goals.goal.Goal`, :class:`~ADCS.CONOPS.goallist.goallist.GoalList`, or None :param dt: Simulation step size in seconds. :type dt: float :param tf: Simulation duration in seconds. :type tf: float :param est_satellite: Estimated satellite model for GNC components. :type est_satellite: :class:`~ADCS.satellite_hardware.satellite.estimated_satellite.EstimatedSatellite` or None :param remote: Remote execution placement and networking settings. If ``None``, defaults from :class:`~ADCS.remote.controller_rpc.RemoteSimulationConfig` are used. :type remote: :class:`~ADCS.remote.controller_rpc.RemoteSimulationConfig` or None :return: Full simulation history container. :rtype: :class:`~ADCS.helpers.simresults.SimulationResults` :raises ConnectionError: If a component is configured as remote and preflight RPC ping fails. """ if remote is None: remote = RemoteSimulationConfig() wall_t0 = time.perf_counter() controller_for_loop = controller if remote.controller == ComponentLocation.REMOTE: controller_for_loop = RemoteControllerProxy( host=remote.host, port=remote.port, timeout_s=remote.timeout_s, retries=remote.retries, ) try: controller_for_loop.ping() except Exception as exc: raise ConnectionError( "Remote controller preflight failed. " f"Could not reach RPC server at {remote.host}:{remote.port}. " "Start debug/debug_remote/run_remote_universal.py on the remote host, " "or set ADCS_REMOTE_HOST/ADCS_REMOTE_PORT to a reachable endpoint." ) from exc estimator_for_loop = estimator if remote.estimator == ComponentLocation.REMOTE: estimator_for_loop = RemoteAttitudeEstimatorProxy( host=remote.host, port=remote.port, timeout_s=remote.timeout_s, retries=remote.retries, ) try: estimator_for_loop.ping() except Exception as exc: raise ConnectionError( "Remote attitude-estimator preflight failed. " f"Could not reach RPC server at {remote.host}:{remote.port}. " "Start debug/debug_remote/run_remote_universal.py with an attitude-estimator component, " "or set ADCS_REMOTE_HOST/ADCS_REMOTE_PORT to a reachable endpoint." ) from exc orbit_estimator_for_loop = orbit_estimator if remote.orbit_estimator == ComponentLocation.REMOTE: orbit_estimator_for_loop = RemoteOrbitEstimatorProxy( host=remote.host, port=remote.port, timeout_s=remote.timeout_s, retries=remote.retries, ) try: orbit_estimator_for_loop.ping() except Exception as exc: raise ConnectionError( "Remote orbit-estimator preflight failed. " f"Could not reach RPC server at {remote.host}:{remote.port}. " "Start debug/debug_remote/run_remote_universal.py with an orbit-estimator component, " "or set ADCS_REMOTE_HOST/ADCS_REMOTE_PORT to a reachable endpoint." ) from exc results = simulate( x=x, satellite=satellite, est_satellite=est_satellite, controller=controller_for_loop, estimator=estimator_for_loop, orbit_estimator=orbit_estimator_for_loop, goal=goal, os0=os0, dt=dt, tf=tf, ) wall_clock_s = time.perf_counter() - wall_t0 _print_hil_timing_summary(results=results, remote=remote, wall_clock_s=wall_clock_s) return results