from __future__ import annotations
__all__ = [
"ComponentLocation",
"RemoteSimulationConfig",
"RemoteControllerProxy",
"RemoteAttitudeEstimatorProxy",
"RemoteOrbitEstimatorProxy",
"serve_remote_components",
"serve_remote_component",
"serve_remote_controller",
]
from dataclasses import dataclass
from enum import Enum
from functools import lru_cache
from typing import Any
from xmlrpc.client import ServerProxy, Transport
from xmlrpc.server import SimpleXMLRPCServer
import http.client
import time
import numpy as np
from ADCS.CONOPS.goals import ECI_Goal, Goal, No_Goal
from ADCS.controller.controller import Controller
from ADCS.estimators.attitude_estimators import Attitude_Estimator
from ADCS.estimators.estimator_helpers import EstimatedOrbital_State
from ADCS.estimators.orbit_estimators import Orbit_Estimator
from ADCS.orbits.orbital_state import Ephemeris, Orbital_State
[docs]
class ComponentLocation(str, Enum):
LOCAL = "local"
REMOTE = "remote"
[docs]
@dataclass
class RemoteSimulationConfig:
controller: ComponentLocation = ComponentLocation.REMOTE
estimator: ComponentLocation = ComponentLocation.LOCAL
orbit_estimator: ComponentLocation = ComponentLocation.LOCAL
host: str = "10.77.0.4"
port: int = 5000
timeout_s: float = 0.25
retries: int = 0
fallback: str = "raise"
class _TimeoutTransport(Transport):
"""XML-RPC transport with per-connection timeout enforcement."""
def __init__(self, timeout_s: float) -> None:
super().__init__()
self._timeout_s = float(timeout_s)
def make_connection(self, host: str):
return http.client.HTTPConnection(host, timeout=self._timeout_s)
def _goal_to_payload(goal: Goal | None) -> dict[str, Any]:
"""Serialize a goal object into an XML-RPC-safe dictionary.
Only goal types currently used by the remote controller path are supported.
The payload is intentionally explicit (with a ``kind`` tag) so the receiver
can safely reconstruct the correct ADCS goal class.
:param goal:
Goal object to serialize. ``None`` is treated as :class:`No_Goal`.
:type goal:
:class:`~ADCS.CONOPS.goals.Goal` or None
:return:
XML-RPC-safe dictionary representation of the goal.
:rtype:
dict[str, Any]
:raises NotImplementedError:
If the goal type is not supported by the remote transport format.
"""
if goal is None or isinstance(goal, No_Goal):
return {"kind": "No_Goal"}
if isinstance(goal, ECI_Goal):
return {
"kind": "ECI_Goal",
"eci_vector": np.asarray(goal.eci_vector, dtype=float).reshape(3).tolist(),
"boresight_name": goal.boresight_name,
}
raise NotImplementedError(
f"Remote controller RPC only supports No_Goal and ECI_Goal for now, got {type(goal).__name__}."
)
def _xmlrpc_safe(value: Any) -> Any:
"""Recursively convert ADCS/NumPy objects into XML-RPC-safe primitives.
XML-RPC transports only basic Python scalar/list/dict types. This helper
normalizes NumPy arrays/scalars and ADCS container objects so request and
response payloads can be serialized by :mod:`xmlrpc.client`.
:param value:
Arbitrary value to normalize for XML-RPC transport.
:type value:
Any
:return:
A transport-safe representation of ``value``.
:rtype:
Any
"""
if isinstance(value, EstimatedOrbital_State):
return _estimated_orbital_state_to_payload(value)
if isinstance(value, np.ndarray):
return value.tolist()
if isinstance(value, np.generic):
return value.item()
if isinstance(value, dict):
return {key: _xmlrpc_safe(item) for key, item in value.items()}
if isinstance(value, (list, tuple)):
return [_xmlrpc_safe(item) for item in value]
return value
def _goal_from_payload(payload: dict[str, Any]) -> Goal:
"""Deserialize a goal payload back into an ADCS goal object.
:param payload:
Goal payload produced by :func:`_goal_to_payload`.
:type payload:
dict[str, Any]
:return:
Reconstructed ADCS goal instance.
:rtype:
:class:`~ADCS.CONOPS.goals.Goal`
:raises NotImplementedError:
If the payload encodes an unsupported goal ``kind``.
"""
kind = payload.get("kind", "No_Goal")
if kind == "No_Goal":
return No_Goal()
if kind == "ECI_Goal":
return ECI_Goal(
eci_vector=np.asarray(payload["eci_vector"], dtype=float).reshape(3),
boresight_name=payload.get("boresight_name"),
)
raise NotImplementedError(f"Unsupported remote goal kind: {kind}")
def _print_remote_marker(marker: str) -> None:
"""Print a compact per-call marker in the server terminal.
:param marker:
Single-character marker identifying the RPC type (for example ``C``,
``A``, or ``O``).
:type marker:
str
"""
print(marker, end="", flush=True)
def _os_to_payload(os0: Orbital_State | None) -> dict[str, Any] | None:
"""Serialize an orbital state for XML-RPC transport.
:param os0:
Orbital state to serialize.
:type os0:
:class:`~ADCS.orbits.orbital_state.Orbital_State` or None
:return:
``None`` if input is ``None``; otherwise a dictionary payload containing
orbital-state fields.
:rtype:
dict[str, Any] or None
"""
return None if os0 is None else _xmlrpc_safe(os0.to_dict())
def _os_from_payload(payload: dict[str, Any] | None) -> Orbital_State | None:
"""Deserialize an orbital-state payload using a cached ephemeris.
:param payload:
Orbital-state dictionary payload, or ``None``.
:type payload:
dict[str, Any] or None
:return:
Reconstructed orbital state object, or ``None``.
:rtype:
:class:`~ADCS.orbits.orbital_state.Orbital_State` or None
"""
if payload is None:
return None
return Orbital_State.from_dict(payload, ephem=_shared_ephemeris(), density_model=None, fast=True)
@lru_cache(maxsize=1)
def _shared_ephemeris() -> Ephemeris:
"""Return a process-wide cached ephemeris instance.
Reusing the same ephemeris instance avoids repeated file loading and reduces
per-request overhead during high-rate RPC deserialization.
:return:
Cached ephemeris object.
:rtype:
:class:`~ADCS.orbits.orbital_state.Ephemeris`
"""
return Ephemeris()
def _estimated_orbital_state_to_payload(state: EstimatedOrbital_State | None) -> dict[str, Any] | None:
"""Serialize an estimated orbital state into an XML-RPC-safe dictionary.
:param state:
Estimated orbital state bundle to serialize.
:type state:
:class:`~ADCS.estimators.estimator_helpers.EstimatedOrbital_State` or None
:return:
Transport-safe dictionary containing ``os``, ``P``, and ``Q`` fields,
or ``None`` if ``state`` is ``None``.
:rtype:
dict[str, Any] or None
"""
if state is None:
return None
return {
"os": _os_to_payload(state.os),
"P": _xmlrpc_safe(np.asarray(state.P, dtype=float)),
"Q": _xmlrpc_safe(np.asarray(state.Q, dtype=float)),
}
def _estimated_orbital_state_from_payload(payload: dict[str, Any] | None) -> EstimatedOrbital_State | None:
"""Deserialize an estimated orbital-state payload.
:param payload:
Payload generated by :func:`_estimated_orbital_state_to_payload`.
:type payload:
dict[str, Any] or None
:return:
Reconstructed estimated orbital-state bundle.
:rtype:
:class:`~ADCS.estimators.estimator_helpers.EstimatedOrbital_State` or None
"""
if payload is None:
return None
return EstimatedOrbital_State(
os=_os_from_payload(payload["os"]),
P=np.asarray(payload["P"], dtype=float),
Q=np.asarray(payload["Q"], dtype=float),
)
class RemoteControllerService:
"""Server-side RPC endpoint for controller `find_u` evaluation."""
def __init__(self, controller: Any) -> None:
self.controller = controller
def ping(self) -> bool:
"""Return service liveness for preflight health checks.
:return:
``True`` when the XML-RPC service is reachable.
:rtype:
bool
"""
return True
def find_u(self, payload: dict[str, Any]) -> dict[str, Any]:
"""Evaluate controller command generation for one simulation step.
:param payload:
Request payload containing serialized controller inputs.
:type payload:
dict[str, Any]
:return:
Response payload with commanded control vector ``u`` and measured
server compute duration in seconds.
:rtype:
dict[str, Any]
"""
_print_remote_marker("C")
start = time.perf_counter()
x_hat = np.asarray(payload["x_hat"], dtype=float)
sens = np.asarray(payload["sens"], dtype=float)
os_hat = _os_from_payload(payload.get("os_hat"))
goal = _goal_from_payload(payload.get("goal", {"kind": "No_Goal"}))
u = self.controller.find_u(
x_hat=x_hat,
sens=sens,
est_sat=self.controller.est_sat,
os_hat=os_hat,
goal=goal,
)
end = time.perf_counter()
return {
"u": np.asarray(u, dtype=float).reshape(-1).tolist(),
"server_compute_s": end - start,
}
class RemoteAttitudeEstimatorService:
"""Server-side RPC endpoint for attitude-estimator `update` calls."""
def __init__(self, estimator: Attitude_Estimator) -> None:
self.estimator = estimator
def ping(self) -> bool:
"""Return service liveness for preflight health checks.
:return:
``True`` when the XML-RPC service is reachable.
:rtype:
bool
"""
return True
def update(self, payload: dict[str, Any]) -> dict[str, Any]:
"""Run one remote attitude-estimator update.
:param payload:
Request payload containing control input, sensor readings, and
optional orbital-state estimate.
:type payload:
dict[str, Any]
:return:
Response payload containing the updated estimated state vector and
server compute duration in seconds.
:rtype:
dict[str, Any]
"""
_print_remote_marker("A")
start = time.perf_counter()
u = np.asarray(payload["u"], dtype=float)
sensors = [np.asarray(sensor, dtype=float) for sensor in payload["sensors"]]
os_hat = _os_from_payload(payload.get("os"))
x_hat = self.estimator.update(u=u, sensors=sensors, os=os_hat)
end = time.perf_counter()
return {
"x_hat": np.asarray(x_hat, dtype=float).reshape(-1).tolist(),
"server_compute_s": end - start,
}
class RemoteOrbitEstimatorService:
"""Server-side RPC endpoint for orbit-estimator `update` calls."""
def __init__(self, estimator: Orbit_Estimator) -> None:
self.estimator = estimator
def ping(self) -> bool:
"""Return service liveness for preflight health checks.
:return:
``True`` when the XML-RPC service is reachable.
:rtype:
bool
"""
return True
def update(self, payload: dict[str, Any]) -> dict[str, Any]:
"""Run one remote orbit-estimator update.
:param payload:
Request payload containing GPS measurements and current J2000 time.
:type payload:
dict[str, Any]
:return:
Response payload containing serialized estimated orbital state and
server compute duration in seconds.
:rtype:
dict[str, Any]
"""
_print_remote_marker("O")
start = time.perf_counter()
gps_measurements = [np.asarray(measurement, dtype=float) for measurement in payload["GPS_measurements"]]
J2000 = float(payload["J2000"])
os_hat = self.estimator.update(GPS_measurements=gps_measurements, J2000=J2000)
end = time.perf_counter()
return {
"os_hat": _estimated_orbital_state_to_payload(os_hat),
"server_compute_s": end - start,
}
class RemoteCompositeService:
"""Unified RPC service that can host controller and estimator components together."""
def __init__(
self,
controller: Controller | None = None,
estimator: Attitude_Estimator | None = None,
orbit_estimator: Orbit_Estimator | None = None,
) -> None:
self.controller = controller
self.estimator = estimator
self.orbit_estimator = orbit_estimator
def ping(self) -> bool:
"""Return service liveness for preflight health checks.
:return:
``True`` when the XML-RPC service is reachable.
:rtype:
bool
"""
return True
def find_u(self, payload: dict[str, Any]) -> dict[str, Any]:
"""Dispatch a remote controller ``find_u`` request.
:param payload:
Request payload containing controller inputs.
:type payload:
dict[str, Any]
:return:
Response payload containing control output ``u`` and server compute
duration in seconds.
:rtype:
dict[str, Any]
:raises RuntimeError:
If no remote controller was configured on this server.
"""
if self.controller is None:
raise RuntimeError("No remote controller is configured on this server.")
_print_remote_marker("C")
start = time.perf_counter()
x_hat = np.asarray(payload["x_hat"], dtype=float)
sens = np.asarray(payload["sens"], dtype=float)
os_hat = _os_from_payload(payload.get("os_hat"))
goal = _goal_from_payload(payload.get("goal", {"kind": "No_Goal"}))
u = self.controller.find_u(
x_hat=x_hat,
sens=sens,
est_sat=self.controller.est_sat,
os_hat=os_hat,
goal=goal,
)
end = time.perf_counter()
return {
"u": np.asarray(u, dtype=float).reshape(-1).tolist(),
"server_compute_s": end - start,
}
def update(self, payload: dict[str, Any]) -> dict[str, Any]:
"""Dispatch a remote estimator update request.
The request is routed to the orbit estimator or attitude estimator based
on the explicit ``component`` field, or inferred payload keys for legacy
callers.
:param payload:
Request payload identifying estimator target and inputs.
:type payload:
dict[str, Any]
:return:
Response payload containing estimator outputs and server compute time.
:rtype:
dict[str, Any]
:raises RuntimeError:
If the requested estimator component is not configured.
:raises ValueError:
If payload does not identify a supported estimator request type.
"""
component = payload.get("component")
if component == "orbit_estimator" or (component is None and "GPS_measurements" in payload):
if self.orbit_estimator is None:
raise RuntimeError("No remote orbit estimator is configured on this server.")
_print_remote_marker("O")
start = time.perf_counter()
gps_measurements = [np.asarray(measurement, dtype=float) for measurement in payload["GPS_measurements"]]
J2000 = float(payload["J2000"])
os_hat = self.orbit_estimator.update(GPS_measurements=gps_measurements, J2000=J2000)
end = time.perf_counter()
return {
"os_hat": _estimated_orbital_state_to_payload(os_hat),
"server_compute_s": end - start,
}
if component == "attitude_estimator" or (component is None and "sensors" in payload):
if self.estimator is None:
raise RuntimeError("No remote attitude estimator is configured on this server.")
_print_remote_marker("A")
start = time.perf_counter()
u = np.asarray(payload["u"], dtype=float)
sensors = [np.asarray(sensor, dtype=float) for sensor in payload["sensors"]]
os_hat = _os_from_payload(payload.get("os"))
x_hat = self.estimator.update(u=u, sensors=sensors, os=os_hat)
end = time.perf_counter()
return {
"x_hat": np.asarray(x_hat, dtype=float).reshape(-1).tolist(),
"server_compute_s": end - start,
}
raise ValueError(
"Remote update payload did not identify an orbit estimator or attitude estimator request."
)
class _RemoteProxyBase:
"""Shared client-side RPC wrapper with retry and timing bookkeeping."""
def __init__(self, *, host: str, port: int, timeout_s: float = 0.25, retries: int = 0) -> None:
self.host = host
self.port = int(port)
self.timeout_s = float(timeout_s)
self.retries = int(retries)
self.last_roundtrip_s: float | None = None
self.last_server_s: float | None = None
self.roundtrip_hist: list[float] = []
self.server_hist: list[float] = []
self._proxy = ServerProxy(
f"http://{self.host}:{self.port}",
allow_none=True,
transport=_TimeoutTransport(self.timeout_s),
)
def ping(self) -> bool:
"""Ping the remote XML-RPC endpoint.
:return:
``True`` if the remote endpoint responds to health checks.
:rtype:
bool
"""
return bool(self._proxy.ping())
def _call(self, method_name: str, payload: dict[str, Any]) -> dict[str, Any]:
"""Perform an RPC call with retry and timing bookkeeping.
:param method_name:
Remote method name to invoke on the XML-RPC server.
:type method_name:
str
:param payload:
XML-RPC-safe request payload.
:type payload:
dict[str, Any]
:return:
RPC response dictionary.
:rtype:
dict[str, Any]
:raises RuntimeError:
If all retries fail and no response is received.
"""
start = time.perf_counter()
response = None
last_error: Exception | None = None
remote_method = getattr(self._proxy, method_name)
for _ in range(self.retries + 1):
try:
response = remote_method(payload)
break
except Exception as exc: # pragma: no cover - network failure path
last_error = exc
if response is None:
raise RuntimeError(f"Remote call {method_name} failed for {self.host}:{self.port}") from last_error
end = time.perf_counter()
self.last_roundtrip_s = end - start
self.last_server_s = float(response.get("server_compute_s", float("nan")))
self.roundtrip_hist.append(self.last_roundtrip_s)
self.server_hist.append(self.last_server_s)
return response
[docs]
class RemoteControllerProxy:
"""Client-side proxy exposing controller-compatible `find_u` calls."""
def __init__(self, *, host: str, port: int, timeout_s: float = 0.25, retries: int = 0) -> None:
self._base = _RemoteProxyBase(host=host, port=port, timeout_s=timeout_s, retries=retries)
@property
def host(self) -> str:
return self._base.host
@property
def port(self) -> int:
return self._base.port
@property
def last_roundtrip_s(self) -> float | None:
return self._base.last_roundtrip_s
@property
def last_server_s(self) -> float | None:
return self._base.last_server_s
@property
def roundtrip_hist(self) -> list[float]:
return self._base.roundtrip_hist
@property
def server_hist(self) -> list[float]:
return self._base.server_hist
[docs]
def ping(self) -> bool:
"""Ping the remote server used by this controller proxy.
:return:
``True`` if the remote endpoint responds.
:rtype:
bool
"""
return self._base.ping()
[docs]
def find_u(self, x_hat: np.ndarray, sens: np.ndarray, est_sat: Any, os_hat: Orbital_State, goal: Goal | None = None) -> np.ndarray:
"""Execute remote controller command synthesis.
:param x_hat:
Estimated state vector for controller input.
:type x_hat:
numpy.ndarray
:param sens:
Sensor measurement vector for controller input.
:type sens:
numpy.ndarray
:param est_sat:
Estimated satellite model (unused for transport parity).
:type est_sat:
Any
:param os_hat:
Orbital state estimate.
:type os_hat:
:class:`~ADCS.orbits.orbital_state.Orbital_State`
:param goal:
Active guidance goal for the current step.
:type goal:
:class:`~ADCS.CONOPS.goals.Goal` or None
:return:
Control command vector from the remote controller.
:rtype:
numpy.ndarray
"""
payload = {
"x_hat": np.asarray(x_hat, dtype=float).reshape(-1).tolist(),
"sens": np.asarray(sens, dtype=float).reshape(-1).tolist(),
"os_hat": _os_to_payload(os_hat),
"goal": _goal_to_payload(goal),
}
response = self._base._call("find_u", payload)
return np.asarray(response["u"], dtype=float).reshape(-1)
[docs]
class RemoteAttitudeEstimatorProxy:
"""Client-side proxy exposing attitude-estimator-compatible `update` calls."""
def __init__(self, *, host: str, port: int, timeout_s: float = 0.25, retries: int = 0) -> None:
self._base = _RemoteProxyBase(host=host, port=port, timeout_s=timeout_s, retries=retries)
@property
def host(self) -> str:
return self._base.host
@property
def port(self) -> int:
return self._base.port
@property
def last_roundtrip_s(self) -> float | None:
return self._base.last_roundtrip_s
@property
def last_server_s(self) -> float | None:
return self._base.last_server_s
@property
def roundtrip_hist(self) -> list[float]:
return self._base.roundtrip_hist
@property
def server_hist(self) -> list[float]:
return self._base.server_hist
[docs]
def ping(self) -> bool:
"""Ping the remote server used by this attitude-estimator proxy.
:return:
``True`` if the remote endpoint responds.
:rtype:
bool
"""
return self._base.ping()
[docs]
def update(self, u: np.ndarray, sensors: list[np.ndarray], os: Orbital_State) -> np.ndarray:
"""Execute one remote attitude-estimator update.
:param u:
Applied control command vector.
:type u:
numpy.ndarray
:param sensors:
Sensor measurement list for the estimator update.
:type sensors:
list[numpy.ndarray]
:param os:
Orbital-state estimate used by the estimator.
:type os:
:class:`~ADCS.orbits.orbital_state.Orbital_State`
:return:
Updated attitude-state estimate vector.
:rtype:
numpy.ndarray
"""
payload = {
"component": "attitude_estimator",
"u": np.asarray(u, dtype=float).reshape(-1).tolist(),
"sensors": [_xmlrpc_safe(np.asarray(sensor, dtype=float)) for sensor in sensors],
"os": _os_to_payload(os),
}
response = self._base._call("update", payload)
return np.asarray(response["x_hat"], dtype=float).reshape(-1)
[docs]
class RemoteOrbitEstimatorProxy:
"""Client-side proxy exposing orbit-estimator-compatible `update` calls."""
def __init__(self, *, host: str, port: int, timeout_s: float = 0.25, retries: int = 0) -> None:
self._base = _RemoteProxyBase(host=host, port=port, timeout_s=timeout_s, retries=retries)
@property
def host(self) -> str:
return self._base.host
@property
def port(self) -> int:
return self._base.port
@property
def last_roundtrip_s(self) -> float | None:
return self._base.last_roundtrip_s
@property
def last_server_s(self) -> float | None:
return self._base.last_server_s
@property
def roundtrip_hist(self) -> list[float]:
return self._base.roundtrip_hist
@property
def server_hist(self) -> list[float]:
return self._base.server_hist
[docs]
def ping(self) -> bool:
"""Ping the remote server used by this orbit-estimator proxy.
:return:
``True`` if the remote endpoint responds.
:rtype:
bool
"""
return self._base.ping()
[docs]
def update(self, GPS_measurements: list[np.ndarray], J2000: float) -> EstimatedOrbital_State:
"""Execute one remote orbit-estimator update.
:param GPS_measurements:
List of GPS measurement vectors.
:type GPS_measurements:
list[numpy.ndarray]
:param J2000:
Current simulation time in Julian centuries from J2000.
:type J2000:
float
:return:
Updated estimated orbital state.
:rtype:
:class:`~ADCS.estimators.estimator_helpers.EstimatedOrbital_State`
"""
payload = {
"component": "orbit_estimator",
"GPS_measurements": [_xmlrpc_safe(np.asarray(measurement, dtype=float)) for measurement in GPS_measurements],
"J2000": float(J2000),
}
response = self._base._call("update", payload)
return _estimated_orbital_state_from_payload(response["os_hat"])
[docs]
def serve_remote_components(
*,
controller: Controller | None = None,
estimator: Attitude_Estimator | None = None,
orbit_estimator: Orbit_Estimator | None = None,
host: str = "0.0.0.0",
port: int = 5000,
) -> None:
"""Serve any combination of ADCS components over a single XML-RPC endpoint.
:param controller:
Optional controller component handling ``find_u`` requests.
:type controller:
:class:`~ADCS.controller.controller.Controller` or None
:param estimator:
Optional attitude estimator handling attitude ``update`` requests.
:type estimator:
:class:`~ADCS.estimators.attitude_estimators.Attitude_Estimator` or None
:param orbit_estimator:
Optional orbit estimator handling orbit ``update`` requests.
:type orbit_estimator:
:class:`~ADCS.estimators.orbit_estimators.Orbit_Estimator` or None
:param host:
Bind address for the server.
:type host:
str
:param port:
Bind port for the server.
:type port:
int
:raises ValueError:
If no components are provided.
"""
if controller is None and estimator is None and orbit_estimator is None:
raise ValueError("At least one remote component must be provided.")
service = RemoteCompositeService(
controller=controller,
estimator=estimator,
orbit_estimator=orbit_estimator,
)
server = SimpleXMLRPCServer((host, int(port)), allow_none=True, logRequests=False)
server.register_introspection_functions()
server.register_instance(service, allow_dotted_names=False)
print(
"[RemoteCompositeService] listening on "
f"{host}:{port} "
f"(controller={'yes' if controller is not None else 'no'}, "
f"attitude_estimator={'yes' if estimator is not None else 'no'}, "
f"orbit_estimator={'yes' if orbit_estimator is not None else 'no'})"
)
server.serve_forever()
[docs]
def serve_remote_component(component: Any, *, host: str = "0.0.0.0", port: int = 5000) -> None:
"""Serve one ADCS component by dispatching to the composite helper.
:param component:
Component instance to serve.
:type component:
Any
:param host:
Bind address for the server.
:type host:
str
:param port:
Bind port for the server.
:type port:
int
:raises TypeError:
If ``component`` is not one of the supported ADCS component classes.
"""
if isinstance(component, Controller):
serve_remote_components(controller=component, host=host, port=port)
return
elif isinstance(component, Attitude_Estimator):
serve_remote_components(estimator=component, host=host, port=port)
return
elif isinstance(component, Orbit_Estimator):
serve_remote_components(orbit_estimator=component, host=host, port=port)
return
else:
raise TypeError(
"Unsupported remote component type. Expected a Controller, Attitude_Estimator, or Orbit_Estimator."
)
[docs]
def serve_remote_controller(controller: Any, *, host: str = "0.0.0.0", port: int = 5000) -> None:
"""Backward-compatible controller-only server entry point.
:param controller:
Controller instance to serve.
:type controller:
Any
:param host:
Bind address for the server.
:type host:
str
:param port:
Bind port for the server.
:type port:
int
"""
serve_remote_component(controller, host=host, port=port)