diff --git a/src/RSIPI/__init__.py b/src/RSIPI/__init__.py index e69de29..43ff60c 100644 --- a/src/RSIPI/__init__.py +++ b/src/RSIPI/__init__.py @@ -0,0 +1,105 @@ +""" +RSIPI - Robot Sensor Interface Python Integration + +A lightweight Python library for real-time control of KUKA industrial robots +via the RSI 3.3 protocol. Provides high-level namespaced API for motion control, +I/O, logging, visualization, and KRL program manipulation. + +Example: + >>> from RSIPI import RSIAPI + >>> api = RSIAPI('RSI_EthernetConfig.xml') + >>> api.start() + >>> api.motion.update_cartesian(X=10, Y=5, Z=0) + >>> api.stop() +""" + +__version__ = "2.0.0" +__author__ = "RSIPI Development Team" + +# Main API +from .rsi_api import RSIAPI + +# Namespace APIs (for type hints and advanced use) +from .motion_api import MotionAPI +from .io_api import IOAPI +from .krl_api import KRLAPI +from .safety_api import SafetyAPI +from .monitoring_api import MonitoringAPI +from .logging_api import LoggingAPI +from .diagnostics_api import DiagnosticsAPI +from .viz_api import VizAPI +from .tools_api import ToolsAPI + +# Core client (for advanced use) +from .rsi_client import RSIClient, ClientState + +# Exceptions +from .exceptions import ( + RSIError, + RSINetworkError, + RSIConnectionError, + RSITimeoutError, + RSIPacketError, + RSISafetyError, + RSISafetyViolation, + RSIEmergencyStop, + RSILimitExceeded, + RSIConfigError, + RSIConfigParseError, + RSIMissingConfigError, + RSIStateError, + RSIInvalidTransition, + RSIClientNotReady, + RSIDataError, + RSILoggingError, + RSIVariableError, + RSIMotionError, + RSITrajectoryError, + RSIKinematicsError, +) + +__all__ = [ + # Main API (primary entry point) + "RSIAPI", + + # Namespace APIs + "MotionAPI", + "IOAPI", + "KRLAPI", + "SafetyAPI", + "MonitoringAPI", + "LoggingAPI", + "DiagnosticsAPI", + "VizAPI", + "ToolsAPI", + + # Core + "RSIClient", + "ClientState", + + # Exceptions + "RSIError", + "RSINetworkError", + "RSIConnectionError", + "RSITimeoutError", + "RSIPacketError", + "RSISafetyError", + "RSISafetyViolation", + "RSIEmergencyStop", + "RSILimitExceeded", + "RSIConfigError", + "RSIConfigParseError", + "RSIMissingConfigError", + "RSIStateError", + "RSIInvalidTransition", + "RSIClientNotReady", + "RSIDataError", + "RSILoggingError", + "RSIVariableError", + "RSIMotionError", + "RSITrajectoryError", + "RSIKinematicsError", + + # Version + "__version__", +] diff --git a/src/RSIPI/config_parser.py b/src/RSIPI/config_parser.py index 8ffa6c9..b845296 100644 --- a/src/RSIPI/config_parser.py +++ b/src/RSIPI/config_parser.py @@ -1,5 +1,7 @@ import logging import xml.etree.ElementTree as ET +from typing import Dict, Any, Tuple, Union, Optional +from .exceptions import RSIConfigError, RSIConfigParseError, RSIMissingConfigError class ConfigParser: """ @@ -8,23 +10,22 @@ class ConfigParser: safety limits from an RSI limits XML file. """ - def __init__(self, config_file, rsi_limits_file=None): + def __init__(self, config_file: str, rsi_limits_file: Optional[str] = None) -> None: """ - Constructor method that loads the config file, parses variable definitions, and optionally - loads safety limits. + Load and parse RSI configuration file with optional safety limits. Args: - config_file (str): Path to the RSI_EthernetConfig.xml file. - rsi_limits_file (str, optional): Path to .rsi.xml file containing safety limits. + config_file: Path to the RSI_EthernetConfig.xml file + rsi_limits_file: Optional path to .rsi.xml file containing safety limits """ from .rsi_limit_parser import parse_rsi_limits - self.config_file = config_file - self.rsi_limits_file = rsi_limits_file - self.safety_limits = {} + self.config_file: str = config_file + self.rsi_limits_file: Optional[str] = rsi_limits_file + self.safety_limits: Dict[str, Tuple[float, float]] = {} # Defines known internal variable structures used in RSI messaging - self.internal_structure = { + self.internal_structure: Dict[str, Union[str, int, Dict[str, float]]] = { "ComStatus": "String", "RIst": {"X":0, "Y":0, "Z":0, "A":0, "B":0, "C":0}, "RSol": {"X":0, "Y":0, "Z":0, "A":0, "B":0, "C":0}, @@ -52,7 +53,9 @@ class ConfigParser: "Tech.T6": {"T61":0, "T62":0, "T63":0, "T64":0, "T65":0, "T66":0, "T67":0, "T68":0, "T69":0, "T610":0}, } - self.network_settings = {} + self.network_settings: Dict[str, Any] = {} + self.receive_variables: Dict[str, Any] + self.send_variables: Dict[str, Any] self.receive_variables, self.send_variables = self.process_config() # Flatten Tech.CX and Tech.TX keys into a single 'Tech' dictionary @@ -67,19 +70,24 @@ class ConfigParser: if self.rsi_limits_file: try: self.safety_limits = parse_rsi_limits(self.rsi_limits_file) + logging.info(f"Loaded safety limits from {rsi_limits_file}") except Exception as e: - print(f"[WARNING] Failed to load .rsi.xml safety limits: {e}") + logging.warning(f"Failed to load .rsi.xml safety limits: {e}") self.safety_limits = {} - def process_config(self): + def process_config(self) -> Tuple[Dict[str, Any], Dict[str, Any]]: """ - Parses the RSI config file and builds the send/receive variable dictionaries. + Parse the RSI config file and build send/receive variable dictionaries. Returns: - tuple: (send_vars, receive_vars) structured dictionaries. + Tuple of (receive_vars, send_vars) structured dictionaries + + Raises: + RSIConfigParseError: If config file cannot be parsed + RSIMissingConfigError: If required settings are missing """ - send_vars = {} - receive_vars = {} + send_vars: Dict[str, Any] = {} + receive_vars: Dict[str, Any] = {} try: tree = ET.parse(self.config_file) @@ -88,7 +96,7 @@ class ConfigParser: # Extract section for IP/port/etc. config = root.find("CONFIG") if config is None: - raise ValueError("Missing section in RSI_EthernetConfig.xml") + raise RSIMissingConfigError("Missing section in RSI_EthernetConfig.xml") self.network_settings = { "ip": config.find("IP_NUMBER").text.strip() if config.find("IP_NUMBER") is not None else None, @@ -97,10 +105,10 @@ class ConfigParser: "onlysend": config.find("ONLYSEND").text.strip().upper() == "TRUE" if config.find("ONLYSEND") is not None else False, } - print(f"✅ Loaded network settings: {self.network_settings}") + logging.info(f"Loaded network settings: {self.network_settings}") if None in self.network_settings.values(): - raise ValueError("Missing one or more required network settings (ip, port, sentype, onlysend)") + raise RSIMissingConfigError("Missing one or more required network settings (ip, port, sentype, onlysend)") # Parse SEND section send_section = root.find("SEND/ELEMENTS") @@ -118,21 +126,24 @@ class ConfigParser: var_type = element.get("TYPE", "") self.process_variable_structure(receive_vars, tag, var_type) - return send_vars, receive_vars + return receive_vars, send_vars + except ET.ParseError as e: + logging.error(f"XML parse error in config file: {e}") + raise RSIConfigParseError(f"Failed to parse {self.config_file}: {e}") from e except Exception as e: logging.error(f"Error processing config file: {e}") - return {}, {} + raise RSIConfigError(f"Config processing failed: {e}") from e - def process_variable_structure(self, var_dict, tag, var_type, indx=""): + def process_variable_structure(self, var_dict: Dict[str, Any], tag: str, var_type: str, indx: str = "") -> None: """ - Processes and assigns a variable to the dictionary based on its tag and type. + Process and assign a variable to the dictionary based on its tag and type. Args: - var_dict (dict): Dictionary to add variable to. - tag (str): Variable tag (can be nested like Tech.T1). - var_type (str): Variable type (e.g. BOOL, DOUBLE, STRING). - indx (str): Optional index (unused). + var_dict: Dictionary to add variable to + tag: Variable tag (can be nested like Tech.T1) + var_type: Variable type (e.g. BOOL, DOUBLE, STRING) + indx: Optional index (unused, reserved for future use) """ tag = tag.replace("DEF_", "") # Remove DEF_ prefix if present @@ -151,14 +162,17 @@ class ConfigParser: var_dict[tag] = self.get_default_value(var_type) @staticmethod - def rename_tech_keys(var_dict): + def rename_tech_keys(var_dict: Dict[str, Any]) -> None: """ - Combines all Tech.XX keys into a single 'Tech' dictionary. + Combine all Tech.XX keys into a single 'Tech' dictionary. + + Modifies var_dict in-place by extracting all keys starting with 'Tech.' + and merging them into a single 'Tech' entry. Args: - var_dict (dict): The variable dictionary to modify. + var_dict: The variable dictionary to modify """ - tech_data = {} + tech_data: Dict[str, Any] = {} for key in list(var_dict.keys()): if key.startswith("Tech."): tech_data.update(var_dict.pop(key)) @@ -166,15 +180,15 @@ class ConfigParser: var_dict["Tech"] = tech_data @staticmethod - def get_default_value(var_type): + def get_default_value(var_type: str) -> Union[bool, str, int, float, None]: """ - Returns a default Python value based on RSI TYPE. + Get default Python value based on RSI TYPE attribute. Args: - var_type (str): RSI type attribute. + var_type: RSI type attribute (BOOL, STRING, LONG, DOUBLE) Returns: - Default Python value. + Default Python value appropriate for the type """ if var_type == "BOOL": return False @@ -186,11 +200,15 @@ class ConfigParser: return 0.0 return None - def get_network_settings(self): + def get_network_settings(self) -> Dict[str, Any]: """ - Returns extracted IP, port, and message mode settings. + Get extracted IP, port, and message mode settings. Returns: - dict: Network settings extracted from the config file. + Dictionary containing network configuration: + - ip: IP address to bind to + - port: UDP port number + - sentype: Message type identifier + - onlysend: Whether to only send (no receive expected) """ return self.network_settings diff --git a/src/RSIPI/diagnostics_api.py b/src/RSIPI/diagnostics_api.py new file mode 100644 index 0000000..83f0244 --- /dev/null +++ b/src/RSIPI/diagnostics_api.py @@ -0,0 +1,118 @@ +"""Diagnostics API namespace for RSIPI (Phase 2 placeholder).""" + +import logging +from typing import Dict, Any, TYPE_CHECKING + +if TYPE_CHECKING: + from .rsi_client import RSIClient + + +class DiagnosticsAPI: + """ + Network and performance diagnostics interface for KUKA RSI robot control. + + Placeholder for Phase 2 features including: + - Timing instrumentation (latency, jitter, cycle time) + - Network quality monitoring (packet loss, IPOC gaps) + - Watchdog timers + - Communication health checks + + This namespace is currently a placeholder and will be fully implemented + in Phase 2 of the RSIPI improvement roadmap. + """ + + def __init__(self, client: 'RSIClient') -> None: + """ + Initialize DiagnosticsAPI namespace. + + Args: + client: RSIClient instance + """ + self.client = client + logging.debug("DiagnosticsAPI initialized (Phase 2 placeholder)") + + def get_stats(self) -> Dict[str, Any]: + """ + Get network and performance statistics. + + Returns: + Dictionary with diagnostic information + + Note: + This is a Phase 2 placeholder. Currently returns basic status only. + + TODO (Phase 2): + - Packet loss rate + - IPOC gap detection + - Average/max/min cycle time + - Jitter measurements + - Buffer health metrics + """ + return { + "status": "Phase 2 placeholder", + "client_state": self.client.state.name if hasattr(self.client, 'state') else "unknown", + "is_running": self.client.is_running() if hasattr(self.client, 'is_running') else False, + "note": "Full diagnostics implementation coming in Phase 2" + } + + def get_timing(self) -> Dict[str, float]: + """ + Get timing metrics (latency, jitter, cycle time). + + Returns: + Dictionary with timing statistics + + Note: + This is a Phase 2 placeholder. + + TODO (Phase 2): + - Round-trip latency (min/max/avg) + - Cycle time distribution + - Jitter analysis + - Timing violations count + """ + return { + "note": "Phase 2 placeholder - timing instrumentation not yet implemented" + } + + def is_healthy(self) -> bool: + """ + Check overall system health. + + Returns: + True if system is healthy, False otherwise + + Note: + This is a Phase 2 placeholder. Currently checks only basic state. + + TODO (Phase 2): + - Watchdog timer status + - Communication timeout detection + - IPOC continuity check + - Buffer overflow detection + - Unexpected state detection + """ + if not hasattr(self.client, 'is_running'): + return False + return self.client.is_running() + + # TODO (Phase 2): Implement full diagnostic features + # def start_watchdog(self, timeout: float = 1.0) -> None: + # """Start watchdog timer for communication monitoring.""" + # pass + # + # def stop_watchdog(self) -> None: + # """Stop watchdog timer.""" + # pass + # + # def get_network_quality(self) -> Dict[str, float]: + # """Get network quality metrics (packet loss, latency variance).""" + # pass + # + # def get_ipoc_gaps(self) -> List[int]: + # """Detect gaps in IPOC sequence indicating missed packets.""" + # pass + # + # def reset_metrics(self) -> None: + # """Reset all diagnostic counters and statistics.""" + # pass diff --git a/src/RSIPI/exceptions.py b/src/RSIPI/exceptions.py new file mode 100644 index 0000000..caa5bbe --- /dev/null +++ b/src/RSIPI/exceptions.py @@ -0,0 +1,140 @@ +""" +Custom exception hierarchy for RSIPI library. + +Provides specific exception types for different failure modes to enable +targeted error handling in applications using RSIPI. +""" + + +class RSIError(Exception): + """Base exception for all RSIPI-related errors.""" + pass + + +# ============================================================================ +# Network & Communication Errors +# ============================================================================ + +class RSINetworkError(RSIError): + """Base class for network-related errors.""" + pass + + +class RSIConnectionError(RSINetworkError): + """Failed to establish or maintain connection with robot controller.""" + pass + + +class RSITimeoutError(RSINetworkError): + """Network operation timed out.""" + pass + + +class RSIPacketError(RSINetworkError): + """Invalid or corrupted packet received.""" + pass + + +# ============================================================================ +# Safety & Validation Errors +# ============================================================================ + +class RSISafetyError(RSIError): + """Base class for safety-related errors.""" + pass + + +class RSISafetyViolation(RSISafetyError): + """Motion command violates safety limits.""" + pass + + +class RSIEmergencyStop(RSISafetyError): + """Emergency stop is active, blocking all motion.""" + pass + + +class RSILimitExceeded(RSISafetyError): + """Value exceeds configured safety limits.""" + pass + + +# ============================================================================ +# Configuration Errors +# ============================================================================ + +class RSIConfigError(RSIError): + """Base class for configuration-related errors.""" + pass + + +class RSIConfigParseError(RSIConfigError): + """Failed to parse XML configuration file.""" + pass + + +class RSIConfigValidationError(RSIConfigError): + """Configuration file contains invalid values.""" + pass + + +class RSIMissingConfigError(RSIConfigError): + """Required configuration parameter is missing.""" + pass + + +# ============================================================================ +# State Machine Errors +# ============================================================================ + +class RSIStateError(RSIError): + """Base class for state machine errors.""" + pass + + +class RSIInvalidTransition(RSIStateError): + """Attempted invalid state transition.""" + pass + + +class RSIClientNotReady(RSIStateError): + """Client is not in appropriate state for requested operation.""" + pass + + +# ============================================================================ +# Data & Logging Errors +# ============================================================================ + +class RSIDataError(RSIError): + """Base class for data-related errors.""" + pass + + +class RSILoggingError(RSIDataError): + """Error during CSV logging operations.""" + pass + + +class RSIVariableError(RSIDataError): + """Invalid variable name or value.""" + pass + + +# ============================================================================ +# Trajectory & Motion Errors +# ============================================================================ + +class RSIMotionError(RSIError): + """Base class for motion-related errors.""" + pass + + +class RSITrajectoryError(RSIMotionError): + """Invalid trajectory definition or execution.""" + pass + + +class RSIKinematicsError(RSIMotionError): + """Kinematic calculation failure.""" + pass diff --git a/src/RSIPI/io_api.py b/src/RSIPI/io_api.py new file mode 100644 index 0000000..e79d1ec --- /dev/null +++ b/src/RSIPI/io_api.py @@ -0,0 +1,75 @@ +"""Digital I/O API namespace for RSIPI.""" + +import logging +from typing import Union, TYPE_CHECKING + +if TYPE_CHECKING: + from .rsi_client import RSIClient + + +class IOAPI: + """ + Digital I/O control interface for KUKA RSI robot control. + + Manages digital input/output signals for coordinating with external systems, + controlling pneumatic tools, and synchronizing with sensors. + """ + + def __init__(self, client: 'RSIClient') -> None: + """ + Initialize IOAPI namespace. + + Args: + client: RSIClient instance for variable access + """ + self.client = client + + def toggle(self, group: str, name: str, state: Union[bool, int]) -> str: + """ + Set a digital I/O variable to the specified state. + + Args: + group: Parent I/O variable group (e.g., 'Digout', 'DiO', 'DiL') + name: I/O channel name or number within the group (e.g., 'o1', '1') + state: Desired state (True/False or 1/0) + + Returns: + Status message indicating success or failure + + Raises: + RSIVariableError: If the specified I/O group or channel doesn't exist + RSISafetyViolation: If safety checks prevent the operation + + Example: + >>> api.io.toggle('Digout', 'o1', True) # Turn on output 1 + 'Updated Digout.o1 to 1' + >>> api.io.toggle('DiL', '5', False) # Turn off input latch 5 + 'Updated DiL.5 to 0' + + Note: + This method goes through the full safety validation chain. I/O + variables can have safety limits configured just like motion axes. + """ + var_name = f"{group}.{name}" + state_value = int(bool(state)) # Ensure binary 0 or 1 + + # Import here to avoid circular dependency + from .tools_api import ToolsAPI + + tools = ToolsAPI(self.client) + result = tools.update_variable(var_name, state_value) + logging.debug(f"I/O {var_name} set to {state_value}") + return result + + # TODO (Phase 3): Implement high-level I/O helpers + # def set_output(self, channel: int, value: bool) -> str: + # """Set digital output by channel number.""" + # pass + # + # def get_input(self, channel: int) -> bool: + # """Read digital input by channel number.""" + # pass + # + # def pulse(self, channel: int, duration: float = 0.1) -> None: + # """Generate a pulse on the specified output channel.""" + # pass diff --git a/src/RSIPI/krl_api.py b/src/RSIPI/krl_api.py new file mode 100644 index 0000000..36b3da2 --- /dev/null +++ b/src/RSIPI/krl_api.py @@ -0,0 +1,133 @@ +"""KRL program manipulation API namespace for RSIPI.""" + +import logging +from typing import Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from .rsi_client import RSIClient + + +class KRLAPI: + """ + KUKA Robot Language (KRL) program manipulation interface. + + Provides utilities for parsing KRL programs, extracting coordinate data, + and injecting RSI control commands into existing KRL workflows. + """ + + def __init__(self, client: 'RSIClient') -> None: + """ + Initialize KRLAPI namespace. + + Args: + client: RSIClient instance (currently unused, reserved for future features) + """ + self.client = client + + @staticmethod + def parse_to_csv(src_file: str, dat_file: str, output_file: str) -> str: + """ + Parse KRL source and data files, extract coordinates to CSV. + + Reads .src (program logic) and .dat (position data) files, extracts + point definitions and movement commands, and exports to CSV format. + + Args: + src_file: Path to KRL .src program file + dat_file: Path to KRL .dat data file + output_file: Path for output CSV file + + Returns: + Status message indicating success or failure + + Raises: + FileNotFoundError: If source or data files don't exist + Exception: If parsing fails + + Example: + >>> api.krl.parse_to_csv('robot_prog.src', 'robot_prog.dat', 'output.csv') + 'KRL data successfully exported to output.csv' + + Note: + The parser extracts position data (E6POS, E6AXIS, FRAME) from the + .dat file and matches them with movement commands (PTP, LIN, CIRC) + from the .src file. + """ + try: + from .krl_to_csv_parser import KRLParser + + parser = KRLParser(src_file, dat_file) + parser.parse_src() + parser.parse_dat() + parser.export_csv(output_file) + logging.info(f"KRL data exported to {output_file}") + return f"KRL data successfully exported to {output_file}" + except Exception as e: + logging.error(f"KRL parsing failed: {e}") + return f"Error parsing KRL files: {e}" + + @staticmethod + def inject_rsi(input_krl: str, output_krl: Optional[str] = None, rsi_config: str = "RSIGatewayv1.rsi") -> str: + """ + Inject RSI control commands into a KRL program. + + Automatically modifies a KRL .src file to include RSI initialization, + sensor communication, and cleanup code. This allows adding real-time + external control to existing robot programs. + + Args: + input_krl: Path to input KRL .src file + output_krl: Optional output path (defaults to overwriting input) + rsi_config: RSI configuration file name (default: 'RSIGatewayv1.rsi') + + Returns: + Status message indicating success or failure + + Raises: + FileNotFoundError: If input KRL file doesn't exist + Exception: If injection fails + + Example: + >>> # Modify in place + >>> api.krl.inject_rsi('robot_prog.src') + 'RSI successfully injected into robot_prog.src' + + >>> # Create new file + >>> api.krl.inject_rsi('robot_prog.src', 'robot_prog_rsi.src') + 'RSI successfully injected into robot_prog_rsi.src' + + Note: + The injection adds: + - RSI_CREATE() at program start + - RSI_ON() before motion commands + - RSI_MOVECORR() during movement + - RSI_OFF() after motion + This allows Python to send corrections during program execution. + """ + try: + from .inject_rsi_to_krl import inject_rsi_to_krl + + inject_rsi_to_krl(input_krl, output_krl, rsi_config) + output_path = output_krl if output_krl else input_krl + logging.info(f"RSI injected into {output_path}") + return f"RSI successfully injected into {output_path}" + except Exception as e: + logging.error(f"RSI injection failed: {e}") + return f"RSI injection failed: {e}" + + # TODO (Phase 3): Implement KRL coordination helpers + # def wait_for_signal(self, channel: int, timeout: float = 5.0) -> bool: + # """Wait for KRL to set a specific I/O signal.""" + # pass + # + # def signal_complete(self, channel: int) -> None: + # """Signal to KRL that Python-side operation is complete.""" + # pass + # + # def write_param(self, slot: str, value: float) -> str: + # """Write parameter to Tech.C variable for KRL to read.""" + # pass + # + # def read_param(self, slot: str) -> float: + # """Read parameter from Tech.T variable written by KRL.""" + # pass diff --git a/src/RSIPI/logging_api.py b/src/RSIPI/logging_api.py new file mode 100644 index 0000000..09a16e3 --- /dev/null +++ b/src/RSIPI/logging_api.py @@ -0,0 +1,148 @@ +"""CSV logging API namespace for RSIPI.""" + +import logging +import datetime +import os +from typing import Optional, TYPE_CHECKING +import pandas as pd + +if TYPE_CHECKING: + from .rsi_client import RSIClient + + +class LoggingAPI: + """ + CSV data logging interface for KUKA RSI robot control. + + Manages high-frequency data logging to CSV files with British date format + timestamps. Logging runs in a separate process to avoid timing interference + with the real-time control loop. + """ + + def __init__(self, client: 'RSIClient') -> None: + """ + Initialize LoggingAPI namespace. + + Args: + client: RSIClient instance for logging control + """ + self.client = client + + def start(self, filename: Optional[str] = None) -> str: + """ + Start CSV logging to file. + + Creates a background logging process that writes send/receive variables + to CSV with British date format timestamps (DD/MM/YYYY HH:MM:SS.mmm). + + Args: + filename: Optional output file path. Auto-generated if not provided + with format: logs/DD-MM-YYYY_HH-MM-SS.csv + + Returns: + Path to the log file being written + + Example: + >>> # Auto-generated filename + >>> path = api.logging.start() + >>> print(path) + logs/16-01-2026_14-32-45.csv + + >>> # Custom filename + >>> path = api.logging.start('my_experiment.csv') + >>> print(path) + my_experiment.csv + + Note: + Logging runs in a separate process and uses a queue-based buffering + system to prevent blocking the real-time control loop. If the queue + fills, old entries are dropped rather than blocking. + """ + if not filename: + timestamp = datetime.datetime.now().strftime("%d-%m-%Y_%H-%M-%S") + filename = f"logs/{timestamp}.csv" + + # Ensure logs directory exists + log_dir = os.path.dirname(filename) + if log_dir and not os.path.exists(log_dir): + os.makedirs(log_dir, exist_ok=True) + logging.info(f"Created logging directory: {log_dir}") + + self.client.start_logging(filename) + logging.info(f"CSV logging started: {filename}") + return filename + + def stop(self) -> str: + """ + Stop CSV logging. + + Signals the logging process to flush remaining data and close the file. + The logging process will terminate gracefully. + + Returns: + Status message + + Example: + >>> api.logging.stop() + 'CSV logging stopped' + + Note: + There may be a brief delay (up to 2 seconds) while the logging + process completes writing buffered data and shuts down. + """ + self.client.stop_logging() + logging.info("CSV logging stopped") + return "CSV logging stopped" + + def is_active(self) -> bool: + """ + Check if CSV logging is currently running. + + Returns: + True if logging process is active and writing data + + Example: + >>> api.logging.start('test.csv') + >>> api.logging.is_active() + True + >>> api.logging.stop() + >>> api.logging.is_active() + False + """ + return self.client.is_logging_active() + + def export(self, filename: str = "movement_log.csv") -> str: + """ + Export recorded movement data to CSV (if logger is attached). + + This is separate from the real-time CSV logging and is intended for + exporting pre-recorded data from an attached logger object. + + Args: + filename: Output CSV file path + + Returns: + Status message with export path + + Raises: + RuntimeError: If no logger is attached or no data available + + Example: + >>> api.logging.export('my_data.csv') + 'Movement data exported to my_data.csv' + + Note: + This method is currently reserved for future use with an attached + data logger. Real-time logging uses start()/stop() instead. + """ + if not hasattr(self.client, "logger") or self.client.logger is None: + raise RuntimeError("No logger attached to RSI client") + + data = self.client.get_movement_data() + if not data: + raise RuntimeError("No data available to export") + + df = pd.DataFrame(data) + df.to_csv(filename, index=False) + logging.info(f"Movement data exported to {filename}") + return f"Movement data exported to {filename}" diff --git a/src/RSIPI/monitoring_api.py b/src/RSIPI/monitoring_api.py new file mode 100644 index 0000000..274320e --- /dev/null +++ b/src/RSIPI/monitoring_api.py @@ -0,0 +1,192 @@ +"""Monitoring and live data API namespace for RSIPI.""" + +import logging +import time +import datetime +from typing import Dict, Any, Union, Optional, TYPE_CHECKING +import numpy as np +import pandas as pd + +if TYPE_CHECKING: + from .rsi_client import RSIClient + + +class MonitoringAPI: + """ + Real-time monitoring interface for KUKA RSI robot data. + + Provides access to live position, velocity, force, and IPOC data + in various formats for external processing and analysis. + """ + + def __init__(self, client: 'RSIClient') -> None: + """ + Initialize MonitoringAPI namespace. + + Args: + client: RSIClient instance for accessing receive variables + """ + self.client = client + + def get_live_data(self) -> Dict[str, Any]: + """ + Retrieve comprehensive real-time RSI data. + + Returns: + Dictionary containing: + - position: TCP position (RIst) {X, Y, Z, A, B, C} + - velocity: TCP velocity {X, Y, Z} + - acceleration: TCP acceleration {X, Y, Z} + - force: Joint motor currents (MaCur) {A1-A6} + - ipoc: Current interrupt point counter + + Example: + >>> data = api.monitoring.get_live_data() + >>> print(f"Position: {data['position']}") + Position: {'X': 600.5, 'Y': -200.3, 'Z': 1450.8, 'A': 0.0, 'B': 0.0, 'C': 0.0} + >>> print(f"IPOC: {data['ipoc']}") + IPOC: 123456 + """ + return { + "position": dict(self.client.receive_variables.get("RIst", {"X": 0, "Y": 0, "Z": 0})), + "velocity": dict(self.client.receive_variables.get("Velocity", {"X": 0, "Y": 0, "Z": 0})), + "acceleration": dict(self.client.receive_variables.get("Acceleration", {"X": 0, "Y": 0, "Z": 0})), + "force": dict(self.client.receive_variables.get("MaCur", {"A1": 0, "A2": 0, "A3": 0, "A4": 0, "A5": 0, "A6": 0})), + "ipoc": self.client.receive_variables.get("IPOC", "N/A") + } + + def get_live_data_as_numpy(self) -> np.ndarray: + """ + Retrieve live RSI data as a NumPy array. + + Returns 2D array with rows: [position, velocity, acceleration, force] + and columns padded to max length (6 for force axes). + + Returns: + NumPy array (4 x max_length) with robot state data + + Example: + >>> arr = api.monitoring.get_live_data_as_numpy() + >>> print(arr.shape) + (4, 6) + >>> print(arr[0]) # Position row + [600.5 -200.3 1450.8 0.0 0.0 0.0] + """ + data = self.get_live_data() + flat = [] + + for section in ["position", "velocity", "acceleration", "force"]: + values = list(data[section].values()) + flat.append(values) + + # Pad to uniform length + max_len = max(len(row) for row in flat) + for row in flat: + row.extend([0.0] * (max_len - len(row))) + + return np.array(flat, dtype=np.float64) + + def get_live_data_as_dataframe(self) -> pd.DataFrame: + """ + Retrieve live RSI data as a Pandas DataFrame. + + Returns: + DataFrame with single row containing current robot state + + Example: + >>> df = api.monitoring.get_live_data_as_dataframe() + >>> print(df.columns) + Index(['position', 'velocity', 'acceleration', 'force', 'ipoc']) + >>> print(df['ipoc'][0]) + 123456 + """ + data = self.get_live_data() + return pd.DataFrame([data]) + + def get_ipoc(self) -> Union[int, str]: + """ + Get current IPOC (Interrupt Point Counter) value. + + The IPOC increments with each RSI cycle (typically every 4ms) and + is used for synchronization between client and controller. + + Returns: + Current IPOC value, or "N/A" if not available + + Example: + >>> ipoc = api.monitoring.get_ipoc() + >>> print(ipoc) + 123456 + """ + return self.client.receive_variables.get("IPOC", "N/A") + + def get_position(self) -> Dict[str, float]: + """ + Get current TCP position in Cartesian coordinates. + + Returns: + Dictionary with X, Y, Z (mm) and A, B, C (degrees) orientation + + Example: + >>> pos = api.monitoring.get_position() + >>> print(f"TCP at X={pos['X']}, Y={pos['Y']}, Z={pos['Z']}") + TCP at X=600.5, Y=-200.3, Z=1450.8 + """ + return dict(self.client.receive_variables.get("RIst", {"X": 0, "Y": 0, "Z": 0, "A": 0, "B": 0, "C": 0})) + + def get_force(self) -> Dict[str, float]: + """ + Get current motor currents for all joints. + + Motor current is a proxy for force/torque applied at each joint. + Units depend on robot model and configuration. + + Returns: + Dictionary with A1-A6 motor current values + + Example: + >>> force = api.monitoring.get_force() + >>> print(f"Joint A1 current: {force['A1']}") + Joint A1 current: 12.5 + """ + return dict(self.client.receive_variables.get("MaCur", {"A1": 0, "A2": 0, "A3": 0, "A4": 0, "A5": 0, "A6": 0})) + + def watch_network(self, duration: Optional[float] = None, rate: float = 0.2) -> None: + """ + Continuously print live position and IPOC data to console. + + Useful for monitoring network communication health and robot movement + during testing and debugging. + + Args: + duration: Watch duration in seconds (None = until Ctrl+C) + rate: Update rate in seconds (default: 0.2 = 5 Hz) + + Example: + >>> # Watch for 10 seconds at 5Hz + >>> api.monitoring.watch_network(duration=10) + [14:32:01] IPOC: 123456 | RIst: {'X': 600.5, 'Y': -200.3, 'Z': 1450.8} + [14:32:01] IPOC: 123506 | RIst: {'X': 600.6, 'Y': -200.3, 'Z': 1450.8} + ... + + >>> # Watch indefinitely (Ctrl+C to stop) + >>> api.monitoring.watch_network() + """ + logging.info("Watching network... Press Ctrl+C to stop.\n") + start_time = time.time() + + try: + while True: + live_data = self.get_live_data() + ipoc = live_data.get("IPOC", "N/A") + rpos = live_data.get("position", {}) + timestamp = datetime.datetime.now().strftime('%H:%M:%S') + print(f"[{timestamp}] IPOC: {ipoc} | RIst: {rpos}") + time.sleep(rate) + + if duration and (time.time() - start_time) >= duration: + logging.info("Network watch duration completed.") + break + + except KeyboardInterrupt: + logging.info("\nStopped network watch.") diff --git a/src/RSIPI/motion_api.py b/src/RSIPI/motion_api.py new file mode 100644 index 0000000..f1b8af3 --- /dev/null +++ b/src/RSIPI/motion_api.py @@ -0,0 +1,523 @@ +"""Motion control API namespace for RSIPI.""" + +import logging +import asyncio +from typing import Dict, List, Any, Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from .rsi_client import RSIClient + + +class MotionAPI: + """ + Motion control interface for KUKA RSI robot control. + + Provides Cartesian and joint-space motion commands, trajectory generation, + execution, and queueing capabilities. All motion commands go through the + SafetyManager validation layer. + """ + + def __init__(self, client: 'RSIClient') -> None: + """ + Initialize MotionAPI namespace. + + Args: + client: RSIClient instance for variable access and safety management + """ + self.client = client + self.trajectory_queue: List[Dict[str, Any]] = [] + + def update_cartesian(self, **kwargs: float) -> None: + """ + Update Cartesian correction values (RKorr). + + Applies corrections to TCP position in world coordinates. Values are + added to the programmed path positions in the KRL program. + + Args: + **kwargs: Axis corrections in mm/degrees: + - X, Y, Z: Position corrections (mm) + - A, B, C: Orientation corrections (degrees) + + Raises: + RSISafetyViolation: If corrections exceed configured limits + + Example: + >>> # Move TCP 10mm in X direction + >>> api.motion.update_cartesian(X=10.0) + + >>> # Move in XYZ + >>> api.motion.update_cartesian(X=5.0, Y=-3.0, Z=12.5) + + >>> # Full 6-axis correction + >>> api.motion.update_cartesian(X=10, Y=5, Z=0, A=0, B=0, C=2.5) + + Note: + RKorr must be configured in the RSI config file and enabled in KRL + using RSI_MOVECORR() for corrections to take effect. + """ + if "RKorr" not in self.client.send_variables: + logging.warning("RKorr not configured in send_variables. Skipping Cartesian update.") + return + + # Import here to avoid circular dependency + from .tools_api import ToolsAPI + tools = ToolsAPI(self.client) + + for axis, value in kwargs.items(): + tools.update_variable(f"RKorr.{axis}", float(value)) + logging.debug(f"RKorr.{axis} set to {value}") + + def update_joints(self, **kwargs: float) -> None: + """ + Update joint correction values (AKorr). + + Applies corrections to individual joint angles. Values are added to + the programmed joint positions in the KRL program. + + Args: + **kwargs: Joint corrections in degrees: + - A1, A2, A3, A4, A5, A6: Joint angle corrections + + Raises: + RSISafetyViolation: If corrections exceed configured limits + + Example: + >>> # Adjust joint 1 by 5 degrees + >>> api.motion.update_joints(A1=5.0) + + >>> # Multi-axis correction + >>> api.motion.update_joints(A1=10.0, A2=-5.0, A3=2.5) + + Note: + AKorr must be configured in the RSI config file and enabled in KRL + using RSI_MOVECORR() for corrections to take effect. + """ + if "AKorr" not in self.client.send_variables: + logging.warning("AKorr not configured in send_variables. Skipping Joint update.") + return + + from .tools_api import ToolsAPI + tools = ToolsAPI(self.client) + + for axis, value in kwargs.items(): + tools.update_variable(f"AKorr.{axis}", float(value)) + logging.debug(f"AKorr.{axis} set to {value}") + + def correct_position(self, correction_type: str, axis: str, value: float) -> str: + """ + Apply a single correction to RKorr or AKorr. + + Lower-level method for explicit correction type and axis specification. + Most users should use update_cartesian() or update_joints() instead. + + Args: + correction_type: 'RKorr' or 'AKorr' + axis: Axis name (e.g., 'X', 'Y', 'Z' for RKorr; 'A1'-'A6' for AKorr) + value: Correction value + + Returns: + Status message + + Raises: + RSISafetyViolation: If correction exceeds configured limits + + Example: + >>> api.motion.correct_position('RKorr', 'X', 10.0) + 'Updated RKorr.X to 10.0' + >>> api.motion.correct_position('AKorr', 'A1', 5.0) + 'Updated AKorr.A1 to 5.0' + """ + from .tools_api import ToolsAPI + tools = ToolsAPI(self.client) + return tools.update_variable(f"{correction_type}.{axis}", value) + + def move_external_axis(self, axis: str, value: float) -> str: + """ + Move an external axis. + + Controls additional axes beyond the standard 6 robot axes, such as + positioners, linear tracks, or tool changers. + + Args: + axis: External axis name (e.g., 'E1', 'E2', 'E3') + value: Position value (units depend on axis configuration) + + Returns: + Status message + + Raises: + RSISafetyViolation: If value exceeds configured limits + + Example: + >>> # Move linear track (E1) to 500mm + >>> api.motion.move_external_axis('E1', 500.0) + 'Updated ELPos.E1 to 500.0' + """ + from .tools_api import ToolsAPI + tools = ToolsAPI(self.client) + return tools.update_variable(f"ELPos.{axis}", value) + + def adjust_speed(self, tech_param: str, value: float) -> str: + """ + Adjust motion parameters via Tech variables. + + Tech variables allow runtime adjustment of motion parameters like + velocity scaling, acceleration limits, or custom user parameters. + + Args: + tech_param: Tech variable path (e.g., 'Tech.T21', 'Tech.C15') + value: Parameter value + + Returns: + Status message + + Example: + >>> # Adjust velocity override via Tech.T21 + >>> api.motion.adjust_speed('Tech.T21', 0.5) # 50% velocity + 'Updated Tech.T21 to 0.5' + + Note: + Tech variable meanings depend on your KRL program implementation. + Coordinate with your KRL developer on parameter assignments. + """ + from .tools_api import ToolsAPI + tools = ToolsAPI(self.client) + return tools.update_variable(tech_param, value) + + @staticmethod + def generate_trajectory( + start: Dict[str, float], + end: Dict[str, float], + steps: int = 100, + space: str = "cartesian", + mode: str = "absolute", + include_resets: bool = False + ) -> List[Dict[str, float]]: + """ + Generate linear interpolated trajectory between two poses. + + Creates a list of waypoints linearly interpolated between start and + end positions. Supports both Cartesian and joint space. + + Args: + start: Starting pose (e.g., {"X":0, "Y":0, "Z":500}) + end: Ending pose (e.g., {"X":100, "Y":0, "Z":500}) + steps: Number of interpolation points (default: 100) + space: 'cartesian' or 'joint' + mode: 'absolute' or 'relative' (reserved for future use) + include_resets: Whether to reset to zero at end (default: False) + + Returns: + List of waypoint dictionaries + + Example: + >>> # Cartesian trajectory + >>> traj = api.motion.generate_trajectory( + ... {"X":0, "Y":0, "Z":500}, + ... {"X":100, "Y":0, "Z":500}, + ... steps=50 + ... ) + >>> len(traj) + 50 + + >>> # Joint trajectory + >>> traj = api.motion.generate_trajectory( + ... {"A1":0, "A2":0, "A3":0}, + ... {"A1":30, "A2":-15, "A3":45}, + ... steps=100, + ... space="joint" + ... ) + + Note: + This uses simple linear interpolation. For velocity-profiled + trajectories, see Phase 4 enhancements (trapezoidal/S-curve). + """ + from .trajectory_planner import generate_trajectory as gen_traj + return gen_traj(start, end, steps, space, mode, include_resets) + + def execute_trajectory( + self, + trajectory: List[Dict[str, float]], + space: str = "cartesian", + rate: float = 0.012 + ) -> None: + """ + Execute a trajectory asynchronously. + + Sends waypoints sequentially to the robot at the specified rate. + Uses asyncio for non-blocking execution. + + Args: + trajectory: List of waypoint dictionaries + space: 'cartesian' or 'joint' + rate: Time between waypoints in seconds (default: 0.012 = ~80Hz) + + Raises: + RSITrajectoryError: If space is invalid + + Example: + >>> # Generate and execute Cartesian trajectory + >>> traj = api.motion.generate_trajectory( + ... {"X":0, "Y":0, "Z":500}, + ... {"X":100, "Y":0, "Z":500}, + ... steps=50 + ... ) + >>> api.motion.execute_trajectory(traj, space="cartesian", rate=0.02) + + Note: + This method uses asyncio. If no event loop is running, one will + be created automatically. The trajectory executes in the background. + """ + from .exceptions import RSITrajectoryError + + async def runner(): + for idx, point in enumerate(trajectory): + if space == "cartesian": + self.update_cartesian(**point) + elif space == "joint": + self.update_joints(**point) + else: + raise RSITrajectoryError("space must be 'cartesian' or 'joint'") + logging.debug(f"Trajectory step {idx + 1}/{len(trajectory)}") + await asyncio.sleep(rate) + + try: + loop = asyncio.get_running_loop() + asyncio.create_task(runner()) + except RuntimeError: + # No event loop running, create one + asyncio.run(runner()) + + def move_cartesian_trajectory( + self, + start_pose: Dict[str, float], + end_pose: Dict[str, float], + steps: int = 50, + rate: float = 0.012 + ) -> None: + """ + Generate and execute Cartesian trajectory in one call. + + Convenience method that combines generate_trajectory() and + execute_trajectory() for Cartesian motion. + + Args: + start_pose: Starting Cartesian pose + end_pose: Ending Cartesian pose + steps: Number of waypoints (default: 50) + rate: Time between waypoints in seconds (default: 0.012) + + Example: + >>> api.motion.move_cartesian_trajectory( + ... {"X":0, "Y":0, "Z":500}, + ... {"X":100, "Y":0, "Z":500}, + ... steps=50, + ... rate=0.02 + ... ) + """ + trajectory = self.generate_trajectory(start_pose, end_pose, steps=steps, space="cartesian") + self.execute_trajectory(trajectory, space="cartesian", rate=rate) + + def move_joint_trajectory( + self, + start_joints: Dict[str, float], + end_joints: Dict[str, float], + steps: int = 50, + rate: float = 0.4 + ) -> None: + """ + Generate and execute joint-space trajectory in one call. + + Convenience method for joint-space motion with sensible defaults + (slower rate typical for joint motion). + + Args: + start_joints: Starting joint configuration + end_joints: Ending joint configuration + steps: Number of waypoints (default: 50) + rate: Time between waypoints in seconds (default: 0.4 for smooth joint motion) + + Example: + >>> api.motion.move_joint_trajectory( + ... {"A1":0, "A2":0, "A3":0, "A4":0, "A5":0, "A6":0}, + ... {"A1":30, "A2":-15, "A3":45, "A4":0, "A5":30, "A6":0}, + ... steps=100 + ... ) + """ + trajectory = self.generate_trajectory(start_joints, end_joints, steps=steps, space="joint") + self.execute_trajectory(trajectory, space="joint", rate=rate) + + def queue_trajectory( + self, + trajectory: List[Dict[str, float]], + space: str = "cartesian", + rate: float = 0.012 + ) -> None: + """ + Add trajectory to execution queue without immediate execution. + + Allows building up a sequence of trajectories that can be executed + together via execute_queued_trajectories(). + + Args: + trajectory: List of waypoint dictionaries + space: 'cartesian' or 'joint' + rate: Time between waypoints in seconds + + Example: + >>> # Queue multiple trajectories + >>> traj1 = api.motion.generate_trajectory(p0, p1, 50) + >>> traj2 = api.motion.generate_trajectory(p1, p2, 50) + >>> api.motion.queue_trajectory(traj1) + >>> api.motion.queue_trajectory(traj2) + >>> api.motion.execute_queued_trajectories() + """ + self.trajectory_queue.append({ + "trajectory": trajectory, + "space": space, + "rate": rate, + }) + logging.debug(f"Queued trajectory: {len(trajectory)} points, {space} space") + + def queue_cartesian_trajectory( + self, + start_pose: Dict[str, float], + end_pose: Dict[str, float], + steps: int = 50, + rate: float = 0.012 + ) -> None: + """ + Generate and queue Cartesian trajectory. + + Args: + start_pose: Starting Cartesian pose + end_pose: Ending Cartesian pose + steps: Number of waypoints + rate: Time between waypoints in seconds + + Raises: + ValueError: If poses are invalid or parameters out of range + + Example: + >>> api.motion.queue_cartesian_trajectory( + ... {"X":0, "Y":0, "Z":500}, + ... {"X":100, "Y":0, "Z":500} + ... ) + """ + if not isinstance(start_pose, dict) or not isinstance(end_pose, dict): + raise ValueError("start_pose and end_pose must be dictionaries") + if steps <= 0: + raise ValueError("Steps must be greater than zero") + if rate <= 0: + raise ValueError("Rate must be greater than zero") + + trajectory = self.generate_trajectory(start_pose, end_pose, steps=steps, space="cartesian") + self.queue_trajectory(trajectory, "cartesian", rate) + + def queue_joint_trajectory( + self, + start_joints: Dict[str, float], + end_joints: Dict[str, float], + steps: int = 50, + rate: float = 0.4 + ) -> None: + """ + Generate and queue joint-space trajectory. + + Args: + start_joints: Starting joint configuration + end_joints: Ending joint configuration + steps: Number of waypoints + rate: Time between waypoints in seconds + + Raises: + ValueError: If joints are invalid or parameters out of range + + Example: + >>> api.motion.queue_joint_trajectory( + ... {"A1":0, "A2":0, "A3":0}, + ... {"A1":30, "A2":-15, "A3":45} + ... ) + """ + if not isinstance(start_joints, dict) or not isinstance(end_joints, dict): + raise ValueError("start_joints and end_joints must be dictionaries") + if steps <= 0: + raise ValueError("Steps must be greater than zero") + if rate <= 0: + raise ValueError("Rate must be greater than zero") + + trajectory = self.generate_trajectory(start_joints, end_joints, steps=steps, space="joint") + self.queue_trajectory(trajectory, "joint", rate) + + def execute_queued_trajectories(self) -> None: + """ + Execute all queued trajectories in sequence. + + Processes the trajectory queue in FIFO order, executing each with its + configured space and rate. Clears the queue after execution. + + Example: + >>> api.motion.queue_cartesian_trajectory(p0, p1, 50) + >>> api.motion.queue_cartesian_trajectory(p1, p2, 50) + >>> api.motion.execute_queued_trajectories() + >>> # Both trajectories executed sequentially + """ + logging.info(f"Executing {len(self.trajectory_queue)} queued trajectories") + for idx, item in enumerate(self.trajectory_queue): + logging.debug(f"Executing queued trajectory {idx + 1}/{len(self.trajectory_queue)}") + self.execute_trajectory(item["trajectory"], item["space"], item["rate"]) + self.clear_queue() + + def clear_queue(self) -> None: + """ + Clear all queued trajectories without execution. + + Example: + >>> api.motion.queue_cartesian_trajectory(p0, p1, 50) + >>> api.motion.clear_queue() # Discard without executing + """ + count = len(self.trajectory_queue) + self.trajectory_queue.clear() + logging.debug(f"Cleared {count} queued trajectories") + + def get_queue(self) -> List[Dict[str, Any]]: + """ + Get metadata about queued trajectories. + + Returns summary information (space, step count, rate) without the + full trajectory data. + + Returns: + List of trajectory metadata dictionaries + + Example: + >>> api.motion.queue_cartesian_trajectory(p0, p1, 50) + >>> api.motion.queue_cartesian_trajectory(p1, p2, 100) + >>> queue = api.motion.get_queue() + >>> for item in queue: + ... print(f"{item['space']}: {item['steps']} steps at {item['rate']}s") + cartesian: 50 steps at 0.012s + cartesian: 100 steps at 0.012s + """ + return [ + {"space": item["space"], "steps": len(item["trajectory"]), "rate": item["rate"]} + for item in self.trajectory_queue + ] + + # TODO (Phase 4): Implement advanced motion features + # def generate_velocity_profile(self, trajectory, profile='trapezoidal'): + # """Apply velocity profiling to trajectory waypoints.""" + # pass + # + # def generate_arc(self, center, radius, start_angle, end_angle, steps): + # """Generate circular arc trajectory.""" + # pass + # + # def generate_circle(self, center, radius, steps): + # """Generate full circle trajectory.""" + # pass + # + # def blend_trajectories(self, traj1, traj2, blend_radius): + # """Smooth transition between two trajectories.""" + # pass diff --git a/src/RSIPI/network_handler.py b/src/RSIPI/network_handler.py index c9d6cb3..e9d3476 100644 --- a/src/RSIPI/network_handler.py +++ b/src/RSIPI/network_handler.py @@ -5,22 +5,42 @@ import xml.etree.ElementTree as ET import os import datetime from queue import Empty +from typing import Dict, Any, Tuple, Optional from .xml_handler import XMLGenerator from .safety_manager import SafetyManager +from .exceptions import RSINetworkError, RSITimeoutError, RSIPacketError, RSILoggingError class CSVLogger(multiprocessing.Process): - """Separate process for writing CSV logs without blocking the network loop.""" + """ + Separate process for writing CSV logs without blocking the network loop. - def __init__(self, log_queue, stop_event, filename): + Runs in background and consumes log entries from a queue, writing them + to CSV file with British date format timestamps. + """ + + def __init__(self, log_queue: multiprocessing.Queue, stop_event: multiprocessing.Event, filename: str) -> None: + """ + Initialize CSV logger process. + + Args: + log_queue: Queue containing log entry dictionaries + stop_event: Event to signal shutdown + filename: Path to output CSV file + """ super().__init__() - self.log_queue = log_queue - self.stop_event = stop_event - self.filename = filename + self.log_queue: multiprocessing.Queue = log_queue + self.stop_event: multiprocessing.Event = stop_event + self.filename: str = filename self.daemon = True - def run(self): - """Write log entries from queue to CSV file.""" + def run(self) -> None: + """ + Write log entries from queue to CSV file. + + Creates directory if needed, writes header on first entry, + timestamps each row with British date format (DD/MM/YYYY HH:MM:SS.mmm). + """ # Ensure logs directory exists log_dir = os.path.dirname(self.filename) if log_dir and not os.path.exists(log_dir): @@ -58,31 +78,65 @@ class CSVLogger(multiprocessing.Process): class NetworkProcess(multiprocessing.Process): - """Handles UDP communication and optional CSV logging in a separate process.""" + """ + Handles UDP communication and CSV logging in a separate process. - def __init__(self, ip, port, send_variables, receive_variables, stop_event, config_parser, start_event, command_queue): + Manages bidirectional UDP communication with KUKA robot controller, + including IPOC synchronization, variable updates, and optional CSV logging. + Runs in separate process to avoid GIL contention with main thread. + """ + + def __init__( + self, + ip: str, + port: int, + send_variables: Any, # multiprocessing.Manager().dict() + receive_variables: Any, # multiprocessing.Manager().dict() + stop_event: multiprocessing.Event, + config_parser: Any, # ConfigParser type + start_event: multiprocessing.Event, + command_queue: multiprocessing.Queue + ) -> None: + """ + Initialize network process. + + Args: + ip: IP address to bind UDP socket to + port: UDP port number + send_variables: Shared dict for variables to send to robot + receive_variables: Shared dict for variables received from robot + stop_event: Event to signal shutdown + config_parser: ConfigParser instance with network settings + start_event: Event to signal when to start communication + command_queue: Queue for receiving commands from parent process + """ super().__init__() self.send_variables = send_variables self.receive_variables = receive_variables - self.stop_event = stop_event - self.start_event = start_event + self.stop_event: multiprocessing.Event = stop_event + self.start_event: multiprocessing.Event = start_event self.config_parser = config_parser - self.command_queue = command_queue - self.safety_manager = SafetyManager(config_parser.safety_limits) + self.command_queue: multiprocessing.Queue = command_queue + self.safety_manager: SafetyManager = SafetyManager(config_parser.safety_limits) - self.client_address = (ip, port) - self.logging_active = multiprocessing.Value('b', False) + self.client_address: Tuple[str, int] = (ip, port) + self.logging_active: Any = multiprocessing.Value('b', False) # c_bool wrapper - self.controller_ip_and_port = None - self.udp_socket = None + self.controller_ip_and_port: Optional[Tuple[str, int]] = None + self.udp_socket: Optional[socket.socket] = None # Logging infrastructure (created when logging starts) - self.log_queue = None - self.log_stop_event = None - self.csv_logger = None + self.log_queue: Optional[multiprocessing.Queue] = None + self.log_stop_event: Optional[multiprocessing.Event] = None + self.csv_logger: Optional[CSVLogger] = None - def run(self): - """Start the network loop.""" + def run(self) -> None: + """ + Start the network loop. + + Waits for start signal, then initializes socket and begins + communication loop. Ensures cleanup on exit. + """ # Wait for start signal, but check stop_event periodically to allow clean shutdown while not self.start_event.wait(timeout=0.5): if self.stop_event.is_set(): @@ -95,8 +149,12 @@ class NetworkProcess(multiprocessing.Process): finally: self._cleanup() - def _setup_socket(self): - """Create and bind the UDP socket.""" + def _setup_socket(self) -> None: + """ + Create and bind the UDP socket. + + Falls back to 0.0.0.0 if specified IP is invalid. + """ if not self.is_valid_ip(self.client_address[0]): logging.warning(f"Invalid IP address '{self.client_address[0]}'. Falling back to '0.0.0.0'.") self.client_address = ('0.0.0.0', self.client_address[1]) @@ -106,8 +164,13 @@ class NetworkProcess(multiprocessing.Process): self.udp_socket.bind(self.client_address) logging.info(f"Network process bound on {self.client_address}") - def _run_loop(self): - """Main communication loop.""" + def _run_loop(self) -> None: + """ + Main communication loop. + + Receives UDP messages from robot, processes them, sends responses, + and optionally logs data to CSV. + """ while not self.stop_event.is_set(): # Check for commands (non-blocking) self._process_commands() @@ -128,7 +191,7 @@ class NetworkProcess(multiprocessing.Process): except Exception as e: logging.error(f"Network process error: {e}") - def _process_commands(self): + def _process_commands(self) -> None: """Process any pending commands from the parent process.""" try: while True: @@ -147,7 +210,7 @@ class NetworkProcess(multiprocessing.Process): except Exception as e: logging.error(f"Error processing command: {e}") - def _queue_log_entry(self): + def _queue_log_entry(self) -> None: """Queue current state for CSV logging (non-blocking).""" try: entry = {} @@ -176,8 +239,13 @@ class NetworkProcess(multiprocessing.Process): except Exception as e: logging.debug(f"Failed to queue log entry: {e}") - def _start_logging(self, filename): - """Start CSV logging to the specified file.""" + def _start_logging(self, filename: str) -> None: + """ + Start CSV logging to the specified file. + + Args: + filename: Path to CSV output file + """ if self.logging_active.value: logging.warning("Logging already active") return @@ -191,8 +259,8 @@ class NetworkProcess(multiprocessing.Process): self.logging_active.value = True logging.info(f"CSV logging started: {filename}") - def _stop_logging(self): - """Stop CSV logging.""" + def _stop_logging(self) -> None: + """Stop CSV logging and cleanup resources.""" if not self.logging_active.value: return @@ -217,8 +285,8 @@ class NetworkProcess(multiprocessing.Process): self.log_stop_event = None logging.info("CSV logging stopped") - def _cleanup(self): - """Clean up resources.""" + def _cleanup(self) -> None: + """Clean up resources on shutdown.""" # Stop logging first self._stop_logging() @@ -231,7 +299,16 @@ class NetworkProcess(multiprocessing.Process): self.udp_socket = None @staticmethod - def is_valid_ip(ip): + def is_valid_ip(ip: str) -> bool: + """ + Check if an IP address is valid and bindable. + + Args: + ip: IP address string to validate + + Returns: + True if IP is valid and can be bound to + """ try: socket.inet_aton(ip) with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: @@ -240,7 +317,18 @@ class NetworkProcess(multiprocessing.Process): except (socket.error, OSError): return False - def process_received_data(self, xml_string): + def process_received_data(self, xml_string: str) -> None: + """ + Parse received XML message and update receive_variables. + + Handles IPOC synchronization by echoing back IPOC+4. + + Args: + xml_string: XML message string from robot controller + + Raises: + RSIPacketError: If XML parsing fails + """ try: root = ET.fromstring(xml_string) for element in root: @@ -253,5 +341,9 @@ class NetworkProcess(multiprocessing.Process): received_ipoc = int(element.text) self.receive_variables["IPOC"] = received_ipoc self.send_variables["IPOC"] = received_ipoc + 4 + except ET.ParseError as e: + logging.error(f"XML parse error in received message: {e}") + raise RSIPacketError(f"Failed to parse received XML: {e}") from e except Exception as e: - logging.error(f"Error parsing received message: {e}") + logging.error(f"Error processing received message: {e}") + raise RSIPacketError(f"Unexpected error parsing packet: {e}") from e diff --git a/src/RSIPI/rsi_api.py b/src/RSIPI/rsi_api.py index 6fee0d7..a6abb42 100644 --- a/src/RSIPI/rsi_api.py +++ b/src/RSIPI/rsi_api.py @@ -1,610 +1,242 @@ +""" +RSIPI - Robot Sensor Interface Python Integration + +Main API orchestrator providing namespaced access to all RSI functionality. +""" + import logging -import pandas as pd -import numpy as np -import json -import matplotlib.pyplot as plt -from .kuka_visualiser import KukaRSIVisualiser -from .krl_to_csv_parser import KRLParser -from .inject_rsi_to_krl import inject_rsi_to_krl -import threading -from .trajectory_planner import generate_trajectory, execute_trajectory -import datetime -from .static_plotter import StaticPlotter -import os -from .live_plotter import LivePlotter from threading import Thread -import asyncio +from typing import Optional, TYPE_CHECKING + +from .motion_api import MotionAPI +from .io_api import IOAPI +from .krl_api import KRLAPI +from .safety_api import SafetyAPI +from .monitoring_api import MonitoringAPI +from .logging_api import LoggingAPI +from .diagnostics_api import DiagnosticsAPI +from .viz_api import VizAPI +from .tools_api import ToolsAPI + +if TYPE_CHECKING: + from .rsi_client import RSIClient, ClientState + class RSIAPI: - """RSI API for programmatic control, including alerts, logging, graphing, and data retrieval.""" + """ + High-level API orchestrator for KUKA RSI robot control. - def __init__(self, config_file="RSI_EthernetConfig.xml"): - """Initialize RSIAPI with an RSI client instance.""" - self.thread = None - self.config_file = config_file - self.client = None - self.graph_process = None - self.graphing_instance = None - self.graph_thread = None# - self.trajectory_queue = [] - self.live_plotter = None - self.live_plot_thread = None + Provides namespaced access to all RSIPI functionality through specialized + sub-APIs. This is the main entry point for most users. + Namespaces: + - motion: Motion control (Cartesian, joints, trajectories) + - io: Digital I/O control + - krl: KRL program manipulation utilities + - safety: Safety management and limits + - monitoring: Live data access and monitoring + - logging: CSV data logging + - diagnostics: Network and performance diagnostics (Phase 2) + - viz: Static and live visualization + - tools: Utilities, debugging, and inspection + + Core Methods (direct access): + - start(): Start RSI communication + - stop(): Stop RSI communication + - reconnect(): Restart network connection + - state: Current client state (property) + + Example: + >>> api = RSIAPI('RSI_EthernetConfig.xml') + >>> api.start() + >>> api.motion.update_cartesian(X=10, Y=5, Z=0) + >>> api.logging.start('test.csv') + >>> # ... robot operation ... + >>> api.logging.stop() + >>> api.stop() + >>> api.viz.plot_static('test.csv', '3d') + """ + + def __init__(self, config_file: str = "RSI_EthernetConfig.xml") -> None: + """ + Initialize RSIAPI with configuration file. + + Creates RSIClient instance (lazy initialization) and sets up all + namespace APIs for organized access to functionality. + + Args: + config_file: Path to RSI_EthernetConfig.xml configuration file + + Example: + >>> api = RSIAPI('config/RSI_EthernetConfig.xml') + >>> print(api.state) + ClientState.INITIALIZED + """ + self.config_file: str = config_file + self.client: Optional['RSIClient'] = None + self._thread: Optional[Thread] = None + + # Initialize client self._ensure_client() - def _ensure_client(self): - """Ensure RSIClient is initialised before use.""" + # Initialize namespace APIs + self.motion = MotionAPI(self.client) + self.io = IOAPI(self.client) + self.krl = KRLAPI(self.client) + self.safety = SafetyAPI(self.client) + self.monitoring = MonitoringAPI(self.client) + self.logging = LoggingAPI(self.client) + self.diagnostics = DiagnosticsAPI(self.client) + self.viz = VizAPI(self.client) + self.tools = ToolsAPI(self.client) + + logging.info("RSIAPI initialized with namespaced structure") + + def _ensure_client(self) -> None: + """ + Ensure RSIClient is initialized (lazy initialization). + + Imports and creates RSIClient only when needed, avoiding circular + dependencies and improving startup time. + """ if self.client is None: from .rsi_client import RSIClient self.client = RSIClient(self.config_file) + logging.debug("RSIClient initialized") - def start_rsi(self): + @property + def state(self) -> 'ClientState': + """ + Get current client state. - self.thread = threading.Thread(target=self.client.start, daemon=True) - self.thread.start() - return "RSI started in background." + Returns: + ClientState enum value (INITIALIZED, STARTING, RUNNING, STOPPING, STOPPED, ERROR) - def stop_rsi(self): - """Stop the RSI client.""" + Example: + >>> api = RSIAPI() + >>> print(api.state) + ClientState.INITIALIZED + >>> api.start() + >>> print(api.state) + ClientState.RUNNING + """ + return self.client.state + + def start(self) -> str: + """ + Start RSI communication in background thread. + + Creates a daemon thread that runs the RSI client's main communication + loop. The thread handles UDP message exchange with the robot controller. + + Returns: + Status message + + Raises: + RSIClientNotReady: If client is not in appropriate state to start + + Example: + >>> api = RSIAPI() + >>> api.start() + 'RSI started in background' + >>> api.state + ClientState.RUNNING + + Note: + The background thread runs as a daemon, so it will automatically + terminate when the main program exits. + """ + self._thread = Thread(target=self.client.start, daemon=True) + self._thread.start() + logging.info("RSI communication started in background thread") + return "RSI started in background" + + def stop(self) -> str: + """ + Stop RSI communication. + + Gracefully shuts down the network process, closes sockets, and + stops any active logging. Waits for background threads to complete. + + Returns: + Status message + + Example: + >>> api.stop() + 'RSI stopped' + + Note: + This method blocks until the network process has fully shut down, + which typically takes 1-3 seconds. + """ self.client.stop() - return "RSI stopped." + logging.info("RSI communication stopped") + return "RSI stopped" - @staticmethod - def generate_report(filename, format_type): + def reconnect(self) -> str: """ - Generate a statistical report from a CSV log file. + Restart network connection without stopping RSI client. - Args: - filename (str): Path to the CSV file (or base name without .csv). - format_type (str): 'csv', 'json', or 'pdf' + Terminates the current network process, creates a fresh one with new + communication resources, and restarts the connection. Useful for + recovering from network errors. + + Returns: + Status message + + Example: + >>> # Network issue detected + >>> api.reconnect() + 'Network connection restarted' + + Note: + This creates fresh multiprocessing Events and Queues. Any queued + but unprocessed data in the old network process will be lost. """ - # Ensure filename ends with .csv - if not filename.endswith(".csv"): - filename += ".csv" - - if not os.path.exists(filename): - raise FileNotFoundError(f"❌ File not found: {filename}") - - df = pd.read_csv(filename) - - # Only keep relevant columns (e.g. actual positions) - position_cols = [col for col in df.columns if col.startswith("Receive.RIst.")] - if not position_cols: - raise ValueError("❌ No 'Receive.RIst' position columns found in CSV.") - - report_data = { - "Max Position": df[position_cols].max().to_dict(), - "Mean Position": df[position_cols].mean().to_dict(), - } - - report_base = filename.replace(".csv", "") - output_path = f"{report_base}_report.{format_type.lower()}" - - if format_type == "csv": - pd.DataFrame(report_data).T.to_csv(output_path) - elif format_type == "json": - with open(output_path, "w") as f: - json.dump(report_data, f, indent=4) - elif format_type == "pdf": - fig, ax = plt.subplots() - pd.DataFrame(report_data).T.plot(kind='bar', ax=ax) - ax.set_title("RSI Position Report") - plt.tight_layout() - plt.savefig(output_path) - else: - raise ValueError(f"Unsupported format: {format_type}") - - return f"Report saved as {output_path}" - - def update_variable(self, name, value): - if "." in name: - parent, child = name.split(".", 1) - full_path = f"{parent}.{child}" - if parent in self.client.send_variables: - current = dict(self.client.send_variables[parent]) - # 🛡️ Validate using SafetyManager - safe_value = self.client.safety_manager.validate(full_path, float(value)) - current[child] = safe_value - self.client.send_variables[parent] = current - return f"Updated {name} to {safe_value}" - else: - raise KeyError(f"Parent variable '{parent}' not found in send_variables") - else: - safe_value = self.client.safety_manager.validate(name, float(value)) - self.client.send_variables[name] = safe_value - return f"Updated {name} to {safe_value}" - - def show_variables(self): - """Print available variable names in send and receive variables.""" - def format_grouped(var_dict): - output = [] - for var, val in var_dict.items(): - if isinstance(val, dict): - sub_vars = ', '.join(val.keys()) - output.append(f"{var}: {sub_vars}") - else: - output.append(var) - return output - - send_vars = format_grouped(self.client.send_variables) - receive_vars = format_grouped(self.client.receive_variables) - - print("Send Variables:") - for item in send_vars: - print(f" - {item}") - - print("\nReceive Variables:") - for item in receive_vars: - print(f" - {item}") - - def get_live_data(self): - """Retrieve real-time RSI data for external processing.""" - return { - "position": self.client.receive_variables.get("RIst", {"X": 0, "Y": 0, "Z": 0}), - "velocity": self.client.receive_variables.get("Velocity", {"X": 0, "Y": 0, "Z": 0}), - "acceleration": self.client.receive_variables.get("Acceleration", {"X": 0, "Y": 0, "Z": 0}), - "force": self.client.receive_variables.get("MaCur", {"A1": 0, "A2": 0, "A3": 0, "A4": 0, "A5": 0, "A6": 0}), - "ipoc": self.client.receive_variables.get("IPOC", "N/A") - } - - def get_live_data_as_numpy(self): - data = self.get_live_data() - flat = [] - for section in ["position", "velocity", "acceleration", "force"]: - values = list(data[section].values()) - flat.append(values) - - max_len = max(len(row) for row in flat) - for row in flat: - row.extend([0] * (max_len - len(row))) # Pad missing values - - return np.array(flat) - - def get_live_data_as_dataframe(self): - """Retrieve live RSI data as a Pandas DataFrame.""" - data = self.get_live_data() - return pd.DataFrame([data]) - - def get_ipoc(self): - """Retrieve the latest IPOC value.""" - return self.client.receive_variables.get("IPOC", "N/A") - - def reconnect(self): - """Restart the network connection without stopping RSI.""" self.client.reconnect() - return "Network connection restarted." + logging.info("Network connection restarted") + return "Network connection restarted" - def toggle_digital_io(self, io_group, io_name, state): + def is_running(self) -> bool: """ - Toggle a digital IO variable. - - Args: - io_group (str): Parent variable group (e.g., 'Digout', 'DiO', 'DiL') - io_name (str): IO name or number within the group (e.g., 'o1', '1') - state (bool | int): Desired state (True/False or 1/0) + Check if RSI client is in RUNNING state. Returns: - str: Success or failure message. + True if actively communicating with robot + + Example: + >>> api = RSIAPI() + >>> api.is_running() + False + >>> api.start() + >>> api.is_running() + True """ - var_name = f"{io_group}.{io_name}" - state_value = int(bool(state)) # ensures it's either 1 or 0 - return self.update_variable(var_name, state_value) + return self.client.is_running() - def move_external_axis(self, axis, value): - """Move an external axis.""" - return self.update_variable(f"ELPos.{axis}", value) - - def correct_position(self, correction_type, axis, value): - """Apply correction to RKorr or AKorr.""" - return self.update_variable(f"{correction_type}.{axis}", value) - - def adjust_speed(self, tech_param, value): - """Adjust speed settings (e.g., Tech.T21).""" - return self.update_variable(tech_param, value) - - def reset_variables(self): - """Reset send variables to default values.""" - self.client.reset_send_variables() - return "✅ Send variables reset to default values." - - def show_config_file(self): - """Retrieve key information from config file.""" - return { - "Network": self.client.config_parser.get_network_settings(), - "Send variables": dict(self.client.send_variables), - "Receive variables": dict(self.client.receive_variables) - } - - def start_logging(self, filename=None): - if not filename: - timestamp = datetime.datetime.now().strftime("%d-%m-%Y_%H-%M-%S") - filename = f"logs/{timestamp}.csv" - - self.client.start_logging(filename) - return filename - - def stop_logging(self): - """Stop logging RSI data.""" - self.client.stop_logging() - return "CSV Logging stopped." - - def is_logging_active(self): - """Return logging status.""" - return self.client.is_logging_active() - - @staticmethod - def generate_plot(csv_path: str, plot_type: str = "3d", overlay_path: str = None): + def is_stopped(self) -> bool: """ - Generate a static plot based on RSI CSV data. - - Args: - csv_path (str): Path to the CSV log file. - plot_type (str): Type of plot to generate. Options: - - "3d", "2d_xy", "2d_xz", "2d_yz" - - "position", "velocity", "acceleration" - - "joints", "force", "deviation" - overlay_path (str): Optional CSV file for planned trajectory (used in "deviation" plots). + Check if RSI client is fully stopped. Returns: - str: Status message indicating plot success or failure. + True if in STOPPED state + + Example: + >>> api.stop() + >>> api.is_stopped() + True """ - if not os.path.exists(csv_path): - return f"CSV file not found: {csv_path}" + return self.client.is_stopped() - try: - plot_type = plot_type.lower() + # Deprecated methods for backward compatibility (Phase 5.1 - to be removed) + # These are kept temporarily to ease migration. Use namespaced methods instead. - match plot_type: - case "3d": - StaticPlotter.plot_3d_trajectory(csv_path) - case "2d_xy": - StaticPlotter.plot_2d_projection(csv_path, plane="xy") - case "2d_xz": - StaticPlotter.plot_2d_projection(csv_path, plane="xz") - case "2d_yz": - StaticPlotter.plot_2d_projection(csv_path, plane="yz") - case "position": - StaticPlotter.plot_position_vs_time(csv_path) - case "velocity": - StaticPlotter.plot_velocity_vs_time(csv_path) - case "acceleration": - StaticPlotter.plot_acceleration_vs_time(csv_path) - case "joints": - StaticPlotter.plot_joint_angles(csv_path) - case "force": - StaticPlotter.plot_motor_currents(csv_path) - case "deviation": - if overlay_path is None or not os.path.exists(overlay_path): - return "Deviation plot requires a valid overlay CSV file." - StaticPlotter.plot_deviation(csv_path, overlay_path) - case _: - return f"Invalid plot type '{plot_type}'. Use one of: 3d, 2d_xy, 2d_xz, 2d_yz, position, velocity, acceleration, joints, force, deviation." + def start_rsi(self) -> str: + """DEPRECATED: Use api.start() instead.""" + logging.warning("start_rsi() is deprecated. Use api.start() instead.") + return self.start() - return f"✅ Plot '{plot_type}' generated successfully." - except Exception as e: - return f"Failed to generate plot '{plot_type}': {str(e)}" - - - - def start_live_plot(self, mode="3d", interval=100): - if self.live_plotter and self.live_plotter.running: - return "Live plotting already active." - - def runner(): - self.live_plotter = LivePlotter(self.client, mode=mode, interval=interval) - self.live_plotter.start() - - self.live_plot_thread = Thread(target=runner, daemon=True) - self.live_plot_thread.start() - return f"Live plot started in '{mode}' mode at {interval}ms interval." - - def stop_live_plot(self): - if self.live_plotter and self.live_plotter.running: - self.live_plotter.stop() - return "Live plotting stopped." - return "No live plot is currently running." - - def change_live_plot_mode(self, mode): - if self.live_plotter and self.live_plotter.running: - self.live_plotter.change_mode(mode) - return f"Live plot mode changed to '{mode}'." - return "No live plot is active to change mode." - - - - # ✅ ALERT METHODS - def enable_alerts(self, enable): - """Enable or disable real-time alerts.""" - self.client.enable_alerts(enable) - return f"Alerts {'enabled' if enable else 'disabled'}." - - def override_safety(self, enabled: bool): - self.client.safety_manager.override_safety(enabled) - - def is_safety_overridden(self) -> bool: - return self.client.safety_manager.is_safety_overridden() - - def set_alert_threshold(self, alert_type, value): - """Set threshold for deviation or force alerts.""" - if alert_type in ["deviation", "force"]: - self.client.set_alert_threshold(alert_type, value) - return f"{alert_type.capitalize()} alert threshold set to {value}" - return "Invalid alert type. Use 'deviation' or 'force'." - - @staticmethod - def visualise_csv_log(csv_file, export=False): - """ - Visualize CSV log file directly via RSIAPI. - - Args: - csv_file (str): Path to CSV log file. - export (bool): Whether to export the plots. - """ - visualizer = KukaRSIVisualiser(csv_file) - visualizer.plot_trajectory() - visualizer.plot_joint_positions() - visualizer.plot_force_trends() - - if export: - visualizer.export_graphs() - - @staticmethod - def parse_krl_to_csv(src_file, dat_file, output_file): - """ - Parses KRL files (.src, .dat) and exports coordinates to CSV. - - Args: - src_file (str): Path to KRL .src file. - dat_file (str): Path to KRL .dat file. - output_file (str): Path for output CSV file. - """ - try: - parser = KRLParser(src_file, dat_file) - parser.parse_src() - parser.parse_dat() - parser.export_csv(output_file) - return f"KRL data successfully exported to {output_file}" - except Exception as e: - return f"Error parsing KRL files: {e}" - - @staticmethod - def inject_rsi(input_krl, output_krl=None, rsi_config="RSIGatewayv1.rsi"): - """ - Inject RSI commands into a KRL (.src) program file. - - Args: - input_krl (str): Path to the input KRL file. - output_krl (str, optional): Path to the output file (defaults to overwriting input). - rsi_config (str, optional): RSI configuration file name. - """ - try: - inject_rsi_to_krl(input_krl, output_krl, rsi_config) - output_path = output_krl if output_krl else input_krl - return f"RSI successfully injected into {output_path}" - except Exception as e: - return f"RSI injection failed: {e}" - - @staticmethod - def generate_trajectory(start, end, steps=100, space="cartesian", mode="absolute", include_resets=False): - """Generates a linear trajectory (Cartesian or Joint).""" - return generate_trajectory(start, end, steps, space, mode, include_resets) - - import asyncio - - def execute_trajectory(self, trajectory, space="cartesian", rate=0.012): - """ - Executes a trajectory intelligently: - - If already inside an asyncio loop -> schedules task in background - - If no loop -> creates one and runs blocking - """ - - async def runner(): - for idx, point in enumerate(trajectory): - if space == "cartesian": - self.update_cartesian(**point) - elif space == "joint": - self.update_joints(**point) - else: - raise ValueError("space must be 'cartesian' or 'joint'") - print(f"Step {idx + 1}/{len(trajectory)} sent") - await asyncio.sleep(rate) - - try: - loop = asyncio.get_running_loop() - # If inside event loop, schedule runner as background task - asyncio.create_task(runner()) - except RuntimeError: - # If no event loop is running, create and run one - asyncio.run(runner()) - - def queue_trajectory(self, trajectory, space="cartesian", rate=0.012): - """Adds a trajectory to the internal queue.""" - self.trajectory_queue.append({ - "trajectory": trajectory, - "space": space, - "rate": rate, - }) - - def clear_trajectory_queue(self): - """Clears all queued trajectories.""" - self.trajectory_queue.clear() - - def get_trajectory_queue(self): - """Returns current queued trajectories (metadata only).""" - return [ - {"space": item["space"], "steps": len(item["trajectory"]), "rate": item["rate"]} - for item in self.trajectory_queue - ] - - def execute_queued_trajectories(self): - """Executes all queued trajectories in order.""" - for item in self.trajectory_queue: - self.execute_trajectory(item["trajectory"], item["space"], item["rate"]) - self.clear_trajectory_queue() - - def export_movement_data(self, filename="movement_log.csv"): - """ - Exports recorded movement data (if available) to a CSV file. - Assumes self.client.logger has stored entries. - """ - if not hasattr(self.client, "logger") or self.client.logger is None: - raise RuntimeError("No logger attached to RSI client.") - - data = self.client.get_movement_data() - if not data: - raise RuntimeError("No data available to export.") - - df = pd.DataFrame(data) - df.to_csv(filename, index=False) - return f"Movement data exported to {filename}" - - @staticmethod - def compare_test_runs(file1, file2): - """ - Compares two test run CSV files. - Returns a summary of average and max deviation for each axis. - """ - import pandas as pd - - df1 = pd.read_csv(file1) - df2 = pd.read_csv(file2) - - shared_cols = [col for col in df1.columns if col in df2.columns and col.startswith("Receive.RIst")] - diffs = {} - - for col in shared_cols: - delta = abs(df1[col] - df2[col]) - diffs[col] = { - "mean_diff": delta.mean(), - "max_diff": delta.max(), - } - - return diffs - - def update_cartesian(self, **kwargs): - """ - Update Cartesian correction values (RKorr). - """ - self._ensure_client() - if "RKorr" not in self.client.send_variables: - logging.warning("Warning: RKorr not configured in send_variables. Skipping Cartesian update.") - return - - for axis, value in kwargs.items(): - self.update_variable(f"RKorr.{axis}", float(value)) - - def update_joints(self, **kwargs): - """ - Update joint correction values (AKorr). - """ - self._ensure_client() - if "AKorr" not in self.client.send_variables: - logging.warning("⚠️ Warning: AKorr not configured in send_variables. Skipping Joint update.") - return - - for axis, value in kwargs.items(): - self.update_variable(f"AKorr.{axis}", float(value)) - - def watch_network(self, duration: float = None, rate: float = 0.2): - """ - Continuously prints current receive variables (and IPOC). - If duration is None, runs until interrupted. - """ - import time - import datetime - - logging.info("Watching network... Press Ctrl+C to stop.\n") - start_time = time.time() - - try: - while True: - live_data = self.get_live_data() - ipoc = live_data.get("IPOC", "N/A") - rpos = live_data.get("RIst", {}) - print(f"[{datetime.datetime.now().strftime('%H:%M:%S')}] IPOC: {ipoc} | RIst: {rpos}") - time.sleep(rate) - - if duration and (time.time() - start_time) >= duration: - break - - except KeyboardInterrupt: - logging.info("\nStopped network watch.") - - def move_cartesian_trajectory(self, start_pose, end_pose, steps=50, rate=0.012): - """ - Generate and execute a Cartesian (TCP) movement between two poses. - Args: - start_pose (dict): e.g. {"X":0, "Y":0, "Z":500} - end_pose (dict): e.g. {"X":100, "Y":0, "Z":500} - steps (int): Number of interpolation points. - rate (float): Time between points in seconds. - """ - trajectory = self.generate_trajectory(start_pose, end_pose, steps=steps, space="cartesian") - self.execute_trajectory(trajectory, space="cartesian", rate=rate) - - def move_joint_trajectory(self, start_joints, end_joints, steps=50, rate=0.4): - """ - Generate and execute a Joint-space movement between two poses. - Args: - start_joints (dict): e.g. {"A1":0, "A2":0, "A3":0, ...} - end_joints (dict): e.g. {"A1":90, "A2":0, "A3":0, ...} - steps (int): Number of interpolation points. - rate (float): Time between points in seconds. - """ - trajectory = self.generate_trajectory(start_joints, end_joints, steps=steps, space="joint") - self.execute_trajectory(trajectory, space="joint", rate=rate) - - def queue_cartesian_trajectory(self, start_pose, end_pose, steps=50, rate=0.012): - """ - Generate and queue a Cartesian movement (no execution). - """ - if not isinstance(start_pose, dict) or not isinstance(end_pose, dict): - raise ValueError("start_pose and end_pose must be dictionaries (e.g., {'X': 0, 'Y': 0, 'Z': 500})") - if steps <= 0: - raise ValueError("Steps must be greater than zero.") - if rate <= 0: - raise ValueError("Rate must be greater than zero.") - - trajectory = self.generate_trajectory(start_pose, end_pose, steps=steps, space="cartesian") - self.queue_trajectory(trajectory, "cartesian", rate) - - def queue_joint_trajectory(self, start_joints, end_joints, steps=50, rate=0.4): - """ - Generate and queue a Joint-space movement (no execution). - """ - if not isinstance(start_joints, dict) or not isinstance(end_joints, dict): - raise ValueError("start_joints and end_joints must be dictionaries (e.g., {'A1': 0, 'A2': 0})") - if steps <= 0: - raise ValueError("Steps must be greater than zero.") - if rate <= 0: - raise ValueError("Rate must be greater than zero.") - - trajectory = self.generate_trajectory(start_joints, end_joints, steps=steps, space="joint") - self.queue_trajectory(trajectory, "joint", rate) - - # --- 🛡️ Safety Management --- - - def safety_stop(self): - """Trigger emergency stop.""" - self._ensure_client() - self.client.safety_manager.emergency_stop() - - def safety_reset(self): - """Reset emergency stop.""" - self._ensure_client() - self.client.safety_manager.reset_stop() - - def safety_status(self): - """Return detailed safety status.""" - self._ensure_client() - sm = self.client.safety_manager - return { - "emergency_stop": sm.is_stopped(), - "safety_override": self.is_safety_overridden(), - "limits": sm.get_limits(), - } - - def safety_set_limit(self, variable, lower, upper): - """Set new safety limit bounds for a specific variable.""" - self._ensure_client() - self.client.safety_manager.set_limit(variable, float(lower), float(upper)) + def stop_rsi(self) -> str: + """DEPRECATED: Use api.stop() instead.""" + logging.warning("stop_rsi() is deprecated. Use api.stop() instead.") + return self.stop() diff --git a/src/RSIPI/rsi_client.py b/src/RSIPI/rsi_client.py index c7664da..9a8eb9e 100644 --- a/src/RSIPI/rsi_client.py +++ b/src/RSIPI/rsi_client.py @@ -2,11 +2,12 @@ import logging import multiprocessing import time from enum import Enum, auto -from threading import Lock +from threading import Lock, Thread +from typing import Optional from .config_parser import ConfigParser from .network_handler import NetworkProcess from .safety_manager import SafetyManager -import threading +from .exceptions import RSIStateError, RSIInvalidTransition, RSIClientNotReady class ClientState(Enum): @@ -32,29 +33,36 @@ class RSIClient: ClientState.ERROR: {ClientState.STOPPING, ClientState.INITIALIZED}, # Via reconnect } - def __init__(self, config_file, rsi_limits_file=None): + def __init__(self, config_file: str, rsi_limits_file: Optional[str] = None) -> None: + """ + Initialize RSI client with configuration and safety limits. + + Args: + config_file: Path to RSI_EthernetConfig.xml + rsi_limits_file: Optional path to .rsi.xml safety limits file + """ logging.info(f"Loading RSI configuration from {config_file}...") - self._state = ClientState.INITIALIZED - self._state_lock = Lock() + self._state: ClientState = ClientState.INITIALIZED + self._state_lock: Lock = Lock() - self.config_parser = ConfigParser(config_file, rsi_limits_file) + self.config_parser: ConfigParser = ConfigParser(config_file, rsi_limits_file) network_settings = self.config_parser.get_network_settings() - self.manager = multiprocessing.Manager() + self.manager: multiprocessing.Manager = multiprocessing.Manager() self.send_variables = self.manager.dict(self.config_parser.send_variables) self.receive_variables = self.manager.dict(self.config_parser.receive_variables) - self.stop_event = multiprocessing.Event() - self.start_event = multiprocessing.Event() - self.command_queue = multiprocessing.Queue() + self.stop_event: multiprocessing.Event = multiprocessing.Event() + self.start_event: multiprocessing.Event = multiprocessing.Event() + self.command_queue: multiprocessing.Queue = multiprocessing.Queue() - self.safety_manager = SafetyManager(self.config_parser.safety_limits) + self.safety_manager: SafetyManager = SafetyManager(self.config_parser.safety_limits) # Shared logging state (readable from parent process) self._logging_active = multiprocessing.Value('b', False) # Create NetworkProcess but don't start communication yet - self.network_process = NetworkProcess( + self.network_process: NetworkProcess = NetworkProcess( network_settings["ip"], network_settings["port"], self.send_variables, @@ -68,9 +76,9 @@ class RSIClient: self.network_process.logging_active = self._logging_active self.network_process.start() - self.logger = None - self.running = False - self.thread = None + self.logger: Optional[any] = None # Reserved for future use + self.running: bool = False + self.thread: Optional[Thread] = None @property def state(self) -> ClientState: @@ -82,8 +90,11 @@ class RSIClient: """ Attempt to transition to a new state. + Args: + new_state: Target state to transition to + Returns: - True if transition was valid and completed, False otherwise. + True if transition was valid and completed, False otherwise """ with self._state_lock: if new_state in self._VALID_TRANSITIONS.get(self._state, set()): @@ -97,18 +108,28 @@ class RSIClient: ) return False - def start(self): - """Send start signal to NetworkProcess and run control loop.""" + def start(self) -> None: + """ + Send start signal to NetworkProcess and run control loop. + + Transitions through STARTING → RUNNING states and maintains + control loop until stopped. + + Raises: + RSIClientNotReady: If client is not in appropriate state to start + """ if not self._transition_to(ClientState.STARTING): - logging.error("Cannot start: invalid state") - return + error_msg = f"Cannot start from state {self.state.name}" + logging.error(error_msg) + raise RSIClientNotReady(error_msg) logging.info("RSIClient sending start signal to NetworkProcess...") self.start_event.set() if not self._transition_to(ClientState.RUNNING): - logging.error("Failed to transition to RUNNING state") - return + error_msg = "Failed to transition to RUNNING state" + logging.error(error_msg) + raise RSIStateError(error_msg) self.running = True logging.info("RSI Client Started") @@ -121,8 +142,9 @@ class RSIClient: except Exception as e: logging.error(f"RSI Client encountered an error: {e}") self._transition_to(ClientState.ERROR) + raise - def stop(self): + def stop(self) -> None: """Stop the network process and the client thread safely.""" if self.state in (ClientState.STOPPED, ClientState.STOPPING): logging.debug("Already stopped or stopping") @@ -151,8 +173,13 @@ class RSIClient: self._transition_to(ClientState.STOPPED) logging.info("RSI Client Stopped") - def reconnect(self): - """Reconnects the network process safely.""" + def reconnect(self) -> None: + """ + Reconnect the network process safely. + + Stops existing connection, resets state, and creates fresh + network process with new communication resources. + """ logging.info("Reconnecting RSI Client network...") # Stop if currently running @@ -189,25 +216,45 @@ class RSIClient: self.network_process.start() # Fresh control thread - self.thread = threading.Thread(target=self.start, daemon=True) + self.thread = Thread(target=self.start, daemon=True) self.thread.start() def is_running(self) -> bool: - """Check if client is in running state.""" + """ + Check if client is in running state. + + Returns: + True if currently running + """ return self.state == ClientState.RUNNING def is_stopped(self) -> bool: - """Check if client is fully stopped.""" + """ + Check if client is fully stopped. + + Returns: + True if in STOPPED state + """ return self.state == ClientState.STOPPED - def start_logging(self, filename): - """Start CSV logging to the specified file.""" + def start_logging(self, filename: str) -> None: + """ + Start CSV logging to the specified file. + + Args: + filename: Path to output CSV file + """ self.command_queue.put({'action': 'start_logging', 'filename': filename}) - def stop_logging(self): + def stop_logging(self) -> None: """Stop CSV logging.""" self.command_queue.put({'action': 'stop_logging'}) def is_logging_active(self) -> bool: - """Check if CSV logging is currently active.""" + """ + Check if CSV logging is currently active. + + Returns: + True if logging is active + """ return self._logging_active.value diff --git a/src/RSIPI/safety_api.py b/src/RSIPI/safety_api.py new file mode 100644 index 0000000..57d248c --- /dev/null +++ b/src/RSIPI/safety_api.py @@ -0,0 +1,149 @@ +"""Safety management API namespace for RSIPI.""" + +import logging +from typing import Dict, Any, TYPE_CHECKING + +if TYPE_CHECKING: + from .rsi_client import RSIClient + + +class SafetyAPI: + """ + Safety management interface for KUKA RSI robot control. + + Provides emergency stop control, limit configuration, and safety status monitoring. + All limits are enforced by the SafetyManager before values are sent to the robot. + """ + + def __init__(self, client: 'RSIClient') -> None: + """ + Initialize SafetyAPI namespace. + + Args: + client: RSIClient instance for accessing safety manager + """ + self.client = client + + def stop(self) -> None: + """ + Trigger emergency stop. + + Activates the safety manager's E-stop flag, blocking all motion commands + until reset() is called. This is a software-level safety mechanism. + + Note: + This is NOT a hardware E-stop and should not be relied upon for + safety-critical applications. Always use proper hardware E-stops. + """ + self.client.safety_manager.emergency_stop() + logging.critical("Emergency stop activated via SafetyAPI") + + def reset(self) -> None: + """ + Reset emergency stop and resume normal operation. + + Clears the E-stop flag, allowing motion commands to proceed again. + Use with caution after ensuring the robot workspace is safe. + """ + self.client.safety_manager.reset_stop() + logging.info("Emergency stop reset via SafetyAPI") + + def status(self) -> Dict[str, Any]: + """ + Get comprehensive safety status information. + + Returns: + Dictionary containing: + - emergency_stop (bool): Whether E-stop is active + - safety_override (bool): Whether safety checks are bypassed + - limits (Dict[str, Tuple[float, float]]): Configured limits + + Example: + >>> status = api.safety.status() + >>> print(status['emergency_stop']) + False + >>> print(status['limits']['RKorr.X']) + (-100.0, 100.0) + """ + sm = self.client.safety_manager + return { + "emergency_stop": sm.is_stopped(), + "safety_override": sm.is_safety_overridden(), + "limits": sm.get_limits(), + } + + def set_limit(self, variable: str, lower: float, upper: float) -> None: + """ + Set or update safety limit bounds for a specific variable. + + Args: + variable: Variable path (e.g., 'RKorr.X', 'AKorr.A1') + lower: Minimum allowed value + upper: Maximum allowed value + + Raises: + ValueError: If lower >= upper + + Example: + >>> api.safety.set_limit('RKorr.X', -50.0, 50.0) + >>> api.safety.set_limit('AKorr.A1', -10.0, 10.0) + """ + if lower >= upper: + raise ValueError(f"Lower limit ({lower}) must be less than upper limit ({upper})") + + self.client.safety_manager.set_limit(variable, float(lower), float(upper)) + logging.info(f"Safety limit set for {variable}: [{lower}, {upper}]") + + def get_limits(self) -> Dict[str, tuple[float, float]]: + """ + Get all configured safety limits. + + Returns: + Dictionary mapping variable paths to (lower, upper) limit tuples + + Example: + >>> limits = api.safety.get_limits() + >>> for var, (lower, upper) in limits.items(): + ... print(f"{var}: [{lower}, {upper}]") + """ + return self.client.safety_manager.get_limits() + + def override(self, enable: bool) -> None: + """ + Enable or disable safety limit override. + + WARNING: Use with EXTREME CAUTION. When enabled, all safety limit + validation is bypassed, allowing potentially dangerous motion commands. + + Args: + enable: True to bypass safety checks, False to re-enable + + Example: + >>> # Temporarily disable limits for calibration + >>> api.safety.override(True) + >>> # ... perform calibration ... + >>> api.safety.override(False) # Re-enable safety + """ + self.client.safety_manager.override_safety(enable) + if enable: + logging.warning("⚠️ SAFETY OVERRIDE ENABLED - All limit checks bypassed!") + else: + logging.info("Safety override disabled - limits re-enabled") + + def is_stopped(self) -> bool: + """ + Check if emergency stop is currently active. + + Returns: + True if E-stop is active, blocking all motion + """ + return self.client.safety_manager.is_stopped() + + def is_overridden(self) -> bool: + """ + Check if safety limit validation is currently bypassed. + + Returns: + True if safety checks are disabled + """ + return self.client.safety_manager.is_safety_overridden() diff --git a/src/RSIPI/safety_manager.py b/src/RSIPI/safety_manager.py index 721a0d9..1e03beb 100644 --- a/src/RSIPI/safety_manager.py +++ b/src/RSIPI/safety_manager.py @@ -1,4 +1,6 @@ import logging +from typing import Dict, Tuple, Optional +from .exceptions import RSISafetyViolation, RSIEmergencyStop, RSILimitExceeded class SafetyManager: @@ -11,71 +13,131 @@ class SafetyManager: - Runtime limit updates """ - def __init__(self, limits=None): + def __init__(self, limits: Optional[Dict[str, Tuple[float, float]]] = None) -> None: """ + Initialize SafetyManager with optional safety limits. + Args: - limits (dict): Optional safety limits in the form: + limits: Optional safety limits in the form: { 'RKorr.X': (-5.0, 5.0), 'AKorr.A1': (-6.0, 6.0), ... } """ - self.limits = limits if limits is not None else {} - self.e_stop = False - self.last_values = {} # Reserved for future tracking or override detection - self.override = False # ➡️ Track if safety checks are overridden + self.limits: Dict[str, Tuple[float, float]] = limits if limits is not None else {} + self.e_stop: bool = False + self.last_values: Dict[str, float] = {} # Reserved for future tracking or override detection + self.override: bool = False # Track if safety checks are overridden def validate(self, path: str, value: float) -> float: + """ + Validate a value against safety limits and emergency stop state. + + Args: + path: Variable path (e.g., 'RKorr.X', 'AKorr.A1') + value: Value to validate + + Returns: + Validated value (unchanged if valid) + + Raises: + RSIEmergencyStop: If emergency stop is active + RSILimitExceeded: If value exceeds configured limits + """ if self.override: # Bypass all safety checks when override is active return value if self.e_stop: logging.warning(f"SafetyManager: {path} update blocked (E-STOP active)") - raise RuntimeError(f"SafetyManager: E-STOP active. Motion blocked for {path}.") + raise RSIEmergencyStop(f"E-STOP active. Motion blocked for {path}.") if path in self.limits: min_val, max_val = self.limits[path] if not (min_val <= value <= max_val): logging.warning(f"SafetyManager: {path}={value} blocked (out of bounds {min_val} to {max_val})") - raise ValueError(f"SafetyManager: {path}={value} is out of bounds ({min_val} to {max_val})") + raise RSILimitExceeded(f"{path}={value} is out of bounds ({min_val} to {max_val})") return value - def emergency_stop(self): - """Activates emergency stop: all motion validation will fail.""" + def emergency_stop(self) -> None: + """Activate emergency stop: all motion validation will fail.""" self.e_stop = True + logging.critical("Emergency stop activated") - def reset_stop(self): - """Resets emergency stop, allowing motion again.""" + def reset_stop(self) -> None: + """Reset emergency stop, allowing motion again.""" self.e_stop = False + logging.info("Emergency stop reset") - def set_limit(self, path: str, min_val: float, max_val: float): - """Sets or overrides a safety limit at runtime.""" + def set_limit(self, path: str, min_val: float, max_val: float) -> None: + """ + Set or override a safety limit at runtime. + + Args: + path: Variable path (e.g., 'RKorr.X') + min_val: Minimum allowed value + max_val: Maximum allowed value + """ self.limits[path] = (min_val, max_val) + logging.info(f"Safety limit updated: {path} = ({min_val}, {max_val})") - def get_limits(self): - """Returns a copy of all current safety limits.""" + def get_limits(self) -> Dict[str, Tuple[float, float]]: + """ + Get a copy of all current safety limits. + + Returns: + Dictionary mapping variable paths to (min, max) tuples + """ return self.limits.copy() - def is_stopped(self): - """Returns whether the emergency stop is active.""" + def is_stopped(self) -> bool: + """ + Check if emergency stop is active. + + Returns: + True if emergency stop is active + """ return self.e_stop - def override_safety(self, enable: bool): - """Enable or disable safety override (bypass all checks).""" + def override_safety(self, enable: bool) -> None: + """ + Enable or disable safety override (bypass all checks). + + Args: + enable: True to enable override, False to disable + + Warning: + Use with extreme caution. All safety checks are bypassed when enabled. + """ self.override = enable + if enable: + logging.warning("⚠️ SAFETY OVERRIDE ENABLED - All safety checks bypassed!") + else: + logging.info("Safety override disabled") def is_safety_overridden(self) -> bool: - """Returns whether safety override is active.""" + """ + Check if safety override is active. + + Returns: + True if safety checks are bypassed + """ return self.override @staticmethod - def check_cartesian_limits(pose): + def check_cartesian_limits(pose: Dict[str, float]) -> bool: """ Check if a Cartesian pose is within general robot limits. + Typical bounds: ±1500 mm in XYZ, ±360° in orientation. + + Args: + pose: Dictionary with keys like 'X', 'Y', 'Z', 'A', 'B', 'C' + + Returns: + True if pose is within limits, False otherwise """ limits = { "X": (-1500, 1500), @@ -91,10 +153,17 @@ class SafetyManager: return True @staticmethod - def check_joint_limits(pose): + def check_joint_limits(pose: Dict[str, float]) -> bool: """ Check if a joint-space pose is within KUKA limits. + Typical KUKA ranges: A1–A6 in defined degrees. + + Args: + pose: Dictionary with keys like 'A1', 'A2', ..., 'A6' + + Returns: + True if pose is within limits, False otherwise """ limits = { "A1": (-185, 185), diff --git a/src/RSIPI/tools_api.py b/src/RSIPI/tools_api.py new file mode 100644 index 0000000..b684b09 --- /dev/null +++ b/src/RSIPI/tools_api.py @@ -0,0 +1,291 @@ +"""Utility tools API namespace for RSIPI.""" + +import logging +import os +import json +from typing import Dict, Any, Union, TYPE_CHECKING +import pandas as pd +import matplotlib.pyplot as plt + +if TYPE_CHECKING: + from .rsi_client import RSIClient + + +class ToolsAPI: + """ + Utility tools interface for KUKA RSI robot control. + + Provides debugging, inspection, data comparison, and reporting utilities + for analyzing RSI performance and robot behavior. + """ + + def __init__(self, client: 'RSIClient') -> None: + """ + Initialize ToolsAPI namespace. + + Args: + client: RSIClient instance for variable access + """ + self.client = client + + def update_variable(self, name: str, value: Union[float, int]) -> str: + """ + Low-level variable update with safety validation. + + Direct access to send_variables for advanced users. Most users should + use higher-level methods like api.motion.update_cartesian() instead. + + Args: + name: Variable name (e.g., 'IPOC', 'RKorr.X', 'Tech.C11') + value: New value to set + + Returns: + Status message + + Raises: + RSIVariableError: If variable not found + RSISafetyViolation: If value violates safety limits + + Example: + >>> api.tools.update_variable('RKorr.X', 10.0) + 'Updated RKorr.X to 10.0' + >>> api.tools.update_variable('Tech.C11', 42) + 'Updated Tech.C11 to 42.0' + + Note: + This bypasses higher-level abstractions and directly modifies + send_variables. Use with caution and prefer namespace-specific + methods when available. + """ + from .exceptions import RSIVariableError + + if "." in name: + parent, child = name.split(".", 1) + full_path = f"{parent}.{child}" + + if parent in self.client.send_variables: + current = dict(self.client.send_variables[parent]) + # Validate using SafetyManager + safe_value = self.client.safety_manager.validate(full_path, float(value)) + current[child] = safe_value + self.client.send_variables[parent] = current + logging.debug(f"Updated {name} to {safe_value}") + return f"Updated {name} to {safe_value}" + else: + raise RSIVariableError(f"Parent variable '{parent}' not found in send_variables") + else: + # Top-level variable + safe_value = self.client.safety_manager.validate(name, float(value)) + self.client.send_variables[name] = safe_value + logging.debug(f"Updated {name} to {safe_value}") + return f"Updated {name} to {safe_value}" + + def show_variables(self) -> None: + """ + Print available send/receive variables to console. + + Displays a formatted list of all configured RSI variables with their + nested structure. Useful for debugging and discovering available data. + + Example: + >>> api.tools.show_variables() + Send Variables: + - IPOC + - RKorr: X, Y, Z, A, B, C + - AKorr: A1, A2, A3, A4, A5, A6 + - Tech: C11, C12, C13, ... T11, T12, ... + + Receive Variables: + - IPOC + - RIst: X, Y, Z, A, B, C + - RSol: X, Y, Z, A, B, C + - ASPos: A1, A2, A3, A4, A5, A6 + - MaCur: A1, A2, A3, A4, A5, A6 + """ + def format_grouped(var_dict): + output = [] + for var, val in var_dict.items(): + if isinstance(val, dict): + sub_vars = ', '.join(val.keys()) + output.append(f"{var}: {sub_vars}") + else: + output.append(var) + return output + + send_vars = format_grouped(self.client.send_variables) + receive_vars = format_grouped(self.client.receive_variables) + + print("\nSend Variables:") + for item in send_vars: + print(f" - {item}") + + print("\nReceive Variables:") + for item in receive_vars: + print(f" - {item}") + print() + + def show_config(self) -> Dict[str, Any]: + """ + Retrieve configuration information. + + Returns network settings and current variable structure from the + active RSI configuration. + + Returns: + Dictionary containing: + - Network: IP, port, sentype, onlysend settings + - Send variables: Current send variable structure + - Receive variables: Current receive variable structure + + Example: + >>> config = api.tools.show_config() + >>> print(config['Network']) + {'ip': '192.168.1.100', 'port': 49152, 'sentype': 'ImFree', 'onlysend': False} + >>> print(config['Send variables'].keys()) + dict_keys(['IPOC', 'RKorr', 'AKorr', 'Tech']) + """ + return { + "Network": self.client.config_parser.get_network_settings(), + "Send variables": dict(self.client.send_variables), + "Receive variables": dict(self.client.receive_variables) + } + + def reset_variables(self) -> str: + """ + Reset send variables to default values. + + Calls the client's reset_send_variables() method if available, + otherwise returns a not-implemented message. + + Returns: + Status message + + Example: + >>> api.tools.reset_variables() + 'Send variables reset to default values' + + Note: + This typically resets correction values (RKorr, AKorr) to zero + and restores default Tech variable values. IPOC is not affected. + """ + if hasattr(self.client, 'reset_send_variables'): + self.client.reset_send_variables() + logging.info("Send variables reset to defaults") + return "Send variables reset to default values" + else: + return "reset_send_variables() not implemented on client" + + @staticmethod + def generate_report(filename: str, format_type: str = "csv") -> str: + """ + Generate statistical report from CSV log file. + + Analyzes recorded RSI data and produces summary statistics for + position, velocity, and other metrics. + + Args: + filename: Path to CSV log file (with or without .csv extension) + format_type: Output format - 'csv', 'json', or 'pdf' + + Returns: + Path to generated report file + + Raises: + FileNotFoundError: If CSV file doesn't exist + ValueError: If format_type is unsupported or CSV has no position data + + Example: + >>> api.tools.generate_report('logs/test_run.csv', 'pdf') + 'Report saved as logs/test_run_report.pdf' + + Note: + PDF reports include bar charts of max/mean position values. + CSV and JSON formats provide tabular statistical data. + """ + # Ensure filename ends with .csv + if not filename.endswith(".csv"): + filename += ".csv" + + if not os.path.exists(filename): + raise FileNotFoundError(f"File not found: {filename}") + + df = pd.read_csv(filename) + + # Extract position columns + position_cols = [col for col in df.columns if col.startswith("Receive.RIst.")] + if not position_cols: + raise ValueError("No 'Receive.RIst' position columns found in CSV") + + report_data = { + "Max Position": df[position_cols].max().to_dict(), + "Mean Position": df[position_cols].mean().to_dict(), + } + + report_base = filename.replace(".csv", "") + output_path = f"{report_base}_report.{format_type.lower()}" + + if format_type == "csv": + pd.DataFrame(report_data).T.to_csv(output_path) + elif format_type == "json": + with open(output_path, "w") as f: + json.dump(report_data, f, indent=4) + elif format_type == "pdf": + fig, ax = plt.subplots() + pd.DataFrame(report_data).T.plot(kind='bar', ax=ax) + ax.set_title("RSI Position Report") + plt.tight_layout() + plt.savefig(output_path) + plt.close(fig) + else: + raise ValueError(f"Unsupported format: {format_type}. Use 'csv', 'json', or 'pdf'.") + + logging.info(f"Report generated: {output_path}") + return f"Report saved as {output_path}" + + @staticmethod + def compare_runs(file1: str, file2: str) -> Dict[str, Dict[str, float]]: + """ + Compare two test run CSV files. + + Calculates mean and max deviation between corresponding position + columns in two log files. Useful for repeatability analysis. + + Args: + file1: Path to first CSV log file + file2: Path to second CSV log file + + Returns: + Dictionary mapping column names to deviation statistics: + - mean_diff: Average absolute difference + - max_diff: Maximum absolute difference + + Raises: + FileNotFoundError: If either file doesn't exist + + Example: + >>> diffs = api.tools.compare_runs('run1.csv', 'run2.csv') + >>> for col, stats in diffs.items(): + ... print(f"{col}: mean={stats['mean_diff']:.3f}, max={stats['max_diff']:.3f}") + Receive.RIst.X: mean=0.234, max=1.456 + Receive.RIst.Y: mean=0.178, max=0.892 + Receive.RIst.Z: mean=0.312, max=1.023 + + Note: + Only compares columns present in both files. Typically used for + comparing repeatability of the same motion program. + """ + df1 = pd.read_csv(file1) + df2 = pd.read_csv(file2) + + shared_cols = [col for col in df1.columns if col in df2.columns and col.startswith("Receive.RIst")] + diffs = {} + + for col in shared_cols: + delta = abs(df1[col] - df2[col]) + diffs[col] = { + "mean_diff": float(delta.mean()), + "max_diff": float(delta.max()), + } + + logging.info(f"Compared {len(shared_cols)} position columns between runs") + return diffs diff --git a/src/RSIPI/viz_api.py b/src/RSIPI/viz_api.py new file mode 100644 index 0000000..4877f13 --- /dev/null +++ b/src/RSIPI/viz_api.py @@ -0,0 +1,271 @@ +"""Visualization API namespace for RSIPI.""" + +import logging +import os +from typing import Optional, TYPE_CHECKING +from threading import Thread + +if TYPE_CHECKING: + from .rsi_client import RSIClient + + +class VizAPI: + """ + Data visualization interface for KUKA RSI robot control. + + Provides static plot generation from CSV logs and live plotting + capabilities for real-time monitoring of robot motion. + """ + + def __init__(self, client: 'RSIClient') -> None: + """ + Initialize VizAPI namespace. + + Args: + client: RSIClient instance for live data access + """ + self.client = client + self.live_plotter = None + self.live_plot_thread = None + + @staticmethod + def plot_static(csv_path: str, plot_type: str = "3d", overlay_path: Optional[str] = None) -> str: + """ + Generate static plots from CSV log data. + + Creates matplotlib visualizations of recorded robot trajectories, + velocities, forces, and other metrics. + + Args: + csv_path: Path to CSV log file + plot_type: Type of plot to generate: + - "3d": 3D trajectory visualization + - "2d_xy", "2d_xz", "2d_yz": 2D projections + - "position": Position vs time + - "velocity": Velocity vs time + - "acceleration": Acceleration vs time + - "joints": Joint angles vs time + - "force": Motor currents vs time + - "deviation": Deviation from planned path (requires overlay_path) + overlay_path: Optional CSV with planned trajectory for deviation plots + + Returns: + Status message + + Raises: + FileNotFoundError: If CSV file doesn't exist + ValueError: If plot_type is invalid + + Example: + >>> api.viz.plot_static('logs/test.csv', '3d') + 'Plot 3d generated successfully' + + >>> api.viz.plot_static('logs/actual.csv', 'deviation', 'logs/planned.csv') + 'Plot deviation generated successfully' + + Note: + Plots are displayed interactively. Close the plot window to continue. + """ + if not os.path.exists(csv_path): + return f"CSV file not found: {csv_path}" + + try: + from .static_plotter import StaticPlotter + + plot_type = plot_type.lower() + + match plot_type: + case "3d": + StaticPlotter.plot_3d_trajectory(csv_path) + case "2d_xy": + StaticPlotter.plot_2d_projection(csv_path, plane="xy") + case "2d_xz": + StaticPlotter.plot_2d_projection(csv_path, plane="xz") + case "2d_yz": + StaticPlotter.plot_2d_projection(csv_path, plane="yz") + case "position": + StaticPlotter.plot_position_vs_time(csv_path) + case "velocity": + StaticPlotter.plot_velocity_vs_time(csv_path) + case "acceleration": + StaticPlotter.plot_acceleration_vs_time(csv_path) + case "joints": + StaticPlotter.plot_joint_angles(csv_path) + case "force": + StaticPlotter.plot_motor_currents(csv_path) + case "deviation": + if overlay_path is None or not os.path.exists(overlay_path): + return "Deviation plot requires a valid overlay CSV file" + StaticPlotter.plot_deviation(csv_path, overlay_path) + case _: + valid_types = "3d, 2d_xy, 2d_xz, 2d_yz, position, velocity, acceleration, joints, force, deviation" + return f"Invalid plot type '{plot_type}'. Use one of: {valid_types}" + + logging.info(f"Generated {plot_type} plot from {csv_path}") + return f"Plot '{plot_type}' generated successfully" + except Exception as e: + logging.error(f"Plot generation failed: {e}") + return f"Failed to generate plot '{plot_type}': {str(e)}" + + def start_live_plot(self, mode: str = "3d", interval: int = 100) -> str: + """ + Start live plotting of robot data. + + Opens an interactive matplotlib window that updates in real-time + as robot data is received. Useful for monitoring during operation. + + Args: + mode: Plot mode: + - "3d": Live 3D trajectory + - "position": Position vs time + - "velocity": Velocity vs time + - "force": Motor currents vs time + interval: Update interval in milliseconds (default: 100ms = 10Hz) + + Returns: + Status message + + Example: + >>> api.viz.start_live_plot('3d', interval=100) + 'Live plot started in 3d mode at 100ms interval' + + >>> # Watch robot move in real-time + >>> # Close plot window or call stop_live_plot() to end + + Note: + Live plotting runs in a background thread. The plot window must + remain open for updates to continue. + """ + if self.live_plotter and hasattr(self.live_plotter, 'running') and self.live_plotter.running: + return "Live plotting already active" + + def runner(): + from .live_plotter import LivePlotter + self.live_plotter = LivePlotter(self.client, mode=mode, interval=interval) + self.live_plotter.start() + + self.live_plot_thread = Thread(target=runner, daemon=True) + self.live_plot_thread.start() + logging.info(f"Live plot started: {mode} mode at {interval}ms") + return f"Live plot started in '{mode}' mode at {interval}ms interval" + + def stop_live_plot(self) -> str: + """ + Stop live plotting. + + Closes the live plot window and stops the update thread. + + Returns: + Status message + + Example: + >>> api.viz.stop_live_plot() + 'Live plotting stopped' + """ + if self.live_plotter and hasattr(self.live_plotter, 'running') and self.live_plotter.running: + self.live_plotter.stop() + logging.info("Live plotting stopped") + return "Live plotting stopped" + return "No live plot is currently running" + + def change_live_plot_mode(self, mode: str) -> str: + """ + Change the mode of an active live plot. + + Switches between different visualization modes without restarting + the plot window. + + Args: + mode: New plot mode (e.g., '3d', 'position', 'velocity', 'force') + + Returns: + Status message + + Example: + >>> api.viz.start_live_plot('3d') + >>> # ... watch for a while ... + >>> api.viz.change_live_plot_mode('position') + 'Live plot mode changed to position' + """ + if self.live_plotter and hasattr(self.live_plotter, 'running') and self.live_plotter.running: + if hasattr(self.live_plotter, 'change_mode'): + self.live_plotter.change_mode(mode) + logging.info(f"Live plot mode changed to: {mode}") + return f"Live plot mode changed to '{mode}'" + else: + return "Live plotter does not support mode changing" + return "No live plot is active to change mode" + + @staticmethod + def visualize_csv_log(csv_file: str, export: bool = False) -> None: + """ + Comprehensive visualization of CSV log file. + + Generates multiple plots (trajectory, joints, force) using the + KukaRSIVisualiser. Optionally exports plots to files. + + Args: + csv_file: Path to CSV log file + export: If True, save plots to disk instead of displaying + + Example: + >>> # Display interactive plots + >>> api.viz.visualize_csv_log('logs/test.csv') + + >>> # Export plots to files + >>> api.viz.visualize_csv_log('logs/test.csv', export=True) + + Note: + This generates three separate plots: + - 3D trajectory with start/end markers + - Joint angle evolution + - Motor current trends + """ + from .kuka_visualiser import KukaRSIVisualiser + + visualizer = KukaRSIVisualiser(csv_file) + visualizer.plot_trajectory() + visualizer.plot_joint_positions() + visualizer.plot_force_trends() + + if export: + visualizer.export_graphs() + logging.info(f"Exported visualizations for {csv_file}") + else: + logging.info(f"Displayed visualizations for {csv_file}") + + @staticmethod + def compare_runs(file1: str, file2: str) -> str: + """ + Generate comparison plots for two test runs. + + Visualizes deviations between two recorded trajectories for + repeatability analysis. + + Args: + file1: Path to first CSV log file + file2: Path to second CSV log file + + Returns: + Status message + + Example: + >>> api.viz.compare_runs('run1.csv', 'run2.csv') + 'Comparison plots generated successfully' + + Note: + Uses plot_static() with deviation mode internally. + """ + if not os.path.exists(file1): + return f"First file not found: {file1}" + if not os.path.exists(file2): + return f"Second file not found: {file2}" + + try: + from .static_plotter import StaticPlotter + StaticPlotter.plot_deviation(file1, file2) + logging.info(f"Generated comparison plot: {file1} vs {file2}") + return "Comparison plots generated successfully" + except Exception as e: + logging.error(f"Comparison plot failed: {e}") + return f"Failed to generate comparison: {str(e)}"