diff --git a/src/RSIPI/diagnostics_api.py b/src/RSIPI/diagnostics_api.py index 83f0244..bb0d78e 100644 --- a/src/RSIPI/diagnostics_api.py +++ b/src/RSIPI/diagnostics_api.py @@ -1,7 +1,7 @@ -"""Diagnostics API namespace for RSIPI (Phase 2 placeholder).""" +"""Diagnostics API namespace for RSIPI (Phase 2).""" import logging -from typing import Dict, Any, TYPE_CHECKING +from typing import Dict, Any, List, TYPE_CHECKING if TYPE_CHECKING: from .rsi_client import RSIClient @@ -11,14 +11,11 @@ class DiagnosticsAPI: """ Network and performance diagnostics interface for KUKA RSI robot control. - Placeholder for Phase 2 features including: - - Timing instrumentation (latency, jitter, cycle time) + Provides real-time access to: + - Timing metrics (latency, jitter, cycle time) - Network quality monitoring (packet loss, IPOC gaps) - - Watchdog timers + - Watchdog timer status - Communication health checks - - This namespace is currently a placeholder and will be fully implemented - in Phase 2 of the RSIPI improvement roadmap. """ def __init__(self, client: 'RSIClient') -> None: @@ -26,93 +23,232 @@ class DiagnosticsAPI: Initialize DiagnosticsAPI namespace. Args: - client: RSIClient instance + client: RSIClient instance with metrics_dict """ self.client = client - logging.debug("DiagnosticsAPI initialized (Phase 2 placeholder)") + logging.debug("DiagnosticsAPI initialized") def get_stats(self) -> Dict[str, Any]: """ - Get network and performance statistics. + Get comprehensive network and performance statistics. Returns: - Dictionary with diagnostic information + Dictionary with diagnostic information: + - mean_cycle_time: Average cycle time in seconds + - std_cycle_time: Standard deviation (jitter) + - min_cycle_time: Minimum cycle time + - max_cycle_time: Maximum cycle time + - jitter: Cycle time variance (alias for std) + - packet_loss_rate: Packet loss percentage + - ipoc_gap_rate: IPOC gaps per 1000 cycles + - total_cycles: Total cycles recorded + - uptime: Time since start in seconds + - is_healthy: Overall health boolean + - warnings: List of warning messages + - watchdog_timeout: Whether watchdog timed out - Note: - This is a Phase 2 placeholder. Currently returns basic status only. - - TODO (Phase 2): - - Packet loss rate - - IPOC gap detection - - Average/max/min cycle time - - Jitter measurements - - Buffer health metrics + Example: + >>> stats = api.diagnostics.get_stats() + >>> print(f"Jitter: {stats['jitter']*1000:.2f}ms") + >>> print(f"Packet loss: {stats['packet_loss_rate']:.2f}%") """ - return { - "status": "Phase 2 placeholder", - "client_state": self.client.state.name if hasattr(self.client, 'state') else "unknown", - "is_running": self.client.is_running() if hasattr(self.client, 'is_running') else False, - "note": "Full diagnostics implementation coming in Phase 2" - } + if not hasattr(self.client, 'metrics_dict'): + return {"error": "Metrics not available"} + + # Return a copy of the metrics dict + return dict(self.client.metrics_dict) def get_timing(self) -> Dict[str, float]: """ - Get timing metrics (latency, jitter, cycle time). + Get timing-specific metrics. Returns: - Dictionary with timing statistics + Dictionary with timing statistics: + - mean_cycle_time: Average in seconds + - std_cycle_time: Standard deviation + - min_cycle_time: Minimum + - max_cycle_time: Maximum + - jitter: Variance (alias) - Note: - This is a Phase 2 placeholder. - - TODO (Phase 2): - - Round-trip latency (min/max/avg) - - Cycle time distribution - - Jitter analysis - - Timing violations count + Example: + >>> timing = api.diagnostics.get_timing() + >>> print(f"Avg cycle: {timing['mean_cycle_time']*1000:.2f}ms") + >>> print(f"Jitter: {timing['jitter']*1000:.2f}ms") """ + stats = self.get_stats() + + if 'error' in stats: + return stats + return { - "note": "Phase 2 placeholder - timing instrumentation not yet implemented" + 'mean_cycle_time': stats.get('mean_cycle_time', 0.0), + 'std_cycle_time': stats.get('std_cycle_time', 0.0), + 'min_cycle_time': stats.get('min_cycle_time', 0.0), + 'max_cycle_time': stats.get('max_cycle_time', 0.0), + 'jitter': stats.get('jitter', 0.0), + } + + def get_network_quality(self) -> Dict[str, float]: + """ + Get network quality metrics. + + Returns: + Dictionary with network metrics: + - packet_loss_rate: Percentage of lost packets + - ipoc_gap_rate: IPOC gaps per 1000 cycles + - total_cycles: Total communication cycles + + Example: + >>> quality = api.diagnostics.get_network_quality() + >>> if quality['packet_loss_rate'] > 1.0: + ... print("Warning: High packet loss!") + """ + stats = self.get_stats() + + if 'error' in stats: + return stats + + return { + 'packet_loss_rate': stats.get('packet_loss_rate', 0.0), + 'ipoc_gap_rate': stats.get('ipoc_gap_rate', 0.0), + 'total_cycles': stats.get('total_cycles', 0), } def is_healthy(self) -> bool: """ Check overall system health. + Evaluates: + - Jitter within acceptable limits (< 2ms) + - Packet loss < 1% + - No watchdog timeout + - Client in RUNNING state + Returns: - True if system is healthy, False otherwise + True if all health checks pass + + Example: + >>> if not api.diagnostics.is_healthy(): + ... warnings = api.diagnostics.get_warnings() + ... for w in warnings: + ... print(f"Warning: {w}") + """ + if not hasattr(self.client, 'metrics_dict'): + return False + + if not self.client.is_running(): + return False + + stats = dict(self.client.metrics_dict) + return stats.get('is_healthy', False) + + def get_warnings(self) -> List[str]: + """ + Get current network health warnings. + + Returns: + List of warning messages (empty if healthy) + + Example: + >>> warnings = api.diagnostics.get_warnings() + >>> for warning in warnings: + ... print(f"⚠️ {warning}") + """ + if not hasattr(self.client, 'metrics_dict'): + return ["Metrics not available"] + + stats = dict(self.client.metrics_dict) + return stats.get('warnings', []) + + def check_watchdog(self) -> bool: + """ + Check if watchdog timer has triggered. + + The watchdog detects communication loss when no packets + are received for >1 second. + + Returns: + True if watchdog timeout detected + + Example: + >>> if api.diagnostics.check_watchdog(): + ... print("Communication lost!") + ... api.reconnect() + """ + if not hasattr(self.client, 'metrics_dict'): + return False + + stats = dict(self.client.metrics_dict) + return stats.get('watchdog_timeout', False) + + def get_uptime(self) -> float: + """ + Get network uptime in seconds. + + Returns: + Seconds since network process started + + Example: + >>> uptime = api.diagnostics.get_uptime() + >>> hours = uptime / 3600 + >>> print(f"Uptime: {hours:.1f} hours") + """ + stats = self.get_stats() + return stats.get('uptime', 0.0) + + def reset_metrics(self) -> None: + """ + Reset all diagnostic metrics. Note: - This is a Phase 2 placeholder. Currently checks only basic state. - - TODO (Phase 2): - - Watchdog timer status - - Communication timeout detection - - IPOC continuity check - - Buffer overflow detection - - Unexpected state detection + This is not yet implemented. Metrics are automatically + reset on reconnect(). """ - if not hasattr(self.client, 'is_running'): - return False - return self.client.is_running() + logging.warning("reset_metrics() not yet implemented - use reconnect() to reset") - # TODO (Phase 2): Implement full diagnostic features - # def start_watchdog(self, timeout: float = 1.0) -> None: - # """Start watchdog timer for communication monitoring.""" - # pass - # - # def stop_watchdog(self) -> None: - # """Stop watchdog timer.""" - # pass - # - # def get_network_quality(self) -> Dict[str, float]: - # """Get network quality metrics (packet loss, latency variance).""" - # pass - # - # def get_ipoc_gaps(self) -> List[int]: - # """Detect gaps in IPOC sequence indicating missed packets.""" - # pass - # - # def reset_metrics(self) -> None: - # """Reset all diagnostic counters and statistics.""" - # pass + def format_stats(self) -> str: + """ + Format statistics as human-readable string. + + Returns: + Formatted string with key metrics + + Example: + >>> print(api.diagnostics.format_stats()) + Network Diagnostics: + Cycle Time: 4.01ms (±0.12ms jitter) + Packet Loss: 0.05% + IPOC Gaps: 0.2 per 1000 cycles + Uptime: 120.5s + Health: ✅ Healthy + """ + stats = self.get_stats() + + if 'error' in stats: + return f"Diagnostics Error: {stats['error']}" + + mean_ct = stats.get('mean_cycle_time', 0) * 1000 # Convert to ms + jitter = stats.get('jitter', 0) * 1000 + packet_loss = stats.get('packet_loss_rate', 0) + ipoc_gaps = stats.get('ipoc_gap_rate', 0) + uptime = stats.get('uptime', 0) + is_healthy = stats.get('is_healthy', False) + warnings = stats.get('warnings', []) + + health_icon = "✅" if is_healthy else "⚠️" + health_text = "Healthy" if is_healthy else "Issues Detected" + + output = f"""Network Diagnostics: + Cycle Time: {mean_ct:.2f}ms (±{jitter:.2f}ms jitter) + Packet Loss: {packet_loss:.2f}% + IPOC Gaps: {ipoc_gaps:.1f} per 1000 cycles + Total Cycles: {stats.get('total_cycles', 0)} + Uptime: {uptime:.1f}s + Health: {health_icon} {health_text}""" + + if warnings: + output += "\n Warnings:" + for warning in warnings: + output += f"\n - {warning}" + + return output diff --git a/src/RSIPI/network_handler.py b/src/RSIPI/network_handler.py index e9d3476..081888f 100644 --- a/src/RSIPI/network_handler.py +++ b/src/RSIPI/network_handler.py @@ -9,6 +9,7 @@ from typing import Dict, Any, Tuple, Optional from .xml_handler import XMLGenerator from .safety_manager import SafetyManager from .exceptions import RSINetworkError, RSITimeoutError, RSIPacketError, RSILoggingError +from .timing_metrics import TimingMetrics class CSVLogger(multiprocessing.Process): @@ -95,7 +96,8 @@ class NetworkProcess(multiprocessing.Process): stop_event: multiprocessing.Event, config_parser: Any, # ConfigParser type start_event: multiprocessing.Event, - command_queue: multiprocessing.Queue + command_queue: multiprocessing.Queue, + metrics_dict: Optional[Any] = None # multiprocessing.Manager().dict() ) -> None: """ Initialize network process. @@ -109,6 +111,7 @@ class NetworkProcess(multiprocessing.Process): config_parser: ConfigParser instance with network settings start_event: Event to signal when to start communication command_queue: Queue for receiving commands from parent process + metrics_dict: Optional shared dict for timing metrics """ super().__init__() self.send_variables = send_variables @@ -130,6 +133,10 @@ class NetworkProcess(multiprocessing.Process): self.log_stop_event: Optional[multiprocessing.Event] = None self.csv_logger: Optional[CSVLogger] = None + # Timing metrics (Phase 2) + self.metrics_dict = metrics_dict + self.timing_metrics: Optional[TimingMetrics] = None + def run(self) -> None: """ Start the network loop. @@ -137,6 +144,11 @@ class NetworkProcess(multiprocessing.Process): Waits for start signal, then initializes socket and begins communication loop. Ensures cleanup on exit. """ + # Initialize timing metrics in child process + if self.metrics_dict is not None: + self.timing_metrics = TimingMetrics() + logging.info("Timing metrics initialized") + # Wait for start signal, but check stop_event periodically to allow clean shutdown while not self.start_event.wait(timeout=0.5): if self.stop_event.is_set(): @@ -169,8 +181,10 @@ class NetworkProcess(multiprocessing.Process): Main communication loop. Receives UDP messages from robot, processes them, sends responses, - and optionally logs data to CSV. + and optionally logs data to CSV. Records timing metrics if enabled. """ + update_counter = 0 # For periodic metrics updates + while not self.stop_event.is_set(): # Check for commands (non-blocking) self._process_commands() @@ -183,11 +197,25 @@ class NetworkProcess(multiprocessing.Process): send_xml = XMLGenerator.generate_send_xml(self.send_variables, self.config_parser.network_settings) self.udp_socket.sendto(send_xml.encode(), self.controller_ip_and_port) + # Record timing metrics (Phase 2) + if self.timing_metrics is not None: + ipoc = self.receive_variables.get("IPOC", 0) + self.timing_metrics.record_cycle(ipoc) + + # Update shared metrics dict every 100 cycles (~400ms) + update_counter += 1 + if update_counter >= 100: + self._update_metrics_dict() + update_counter = 0 + if self.logging_active.value and self.log_queue: self._queue_log_entry() except socket.timeout: logging.warning("No message received within timeout period") + # Check watchdog on timeout + if self.timing_metrics and self.timing_metrics.check_watchdog(): + logging.error("Watchdog timeout - communication lost!") except Exception as e: logging.error(f"Network process error: {e}") @@ -210,6 +238,26 @@ class NetworkProcess(multiprocessing.Process): except Exception as e: logging.error(f"Error processing command: {e}") + def _update_metrics_dict(self) -> None: + """Update shared metrics dictionary with current timing statistics (Phase 2).""" + if self.metrics_dict is None or self.timing_metrics is None: + return + + try: + stats = self.timing_metrics.get_current_stats() + health = self.timing_metrics.get_health_status() + + # Update shared dict (Manager dict supports item assignment) + for key, value in stats.items(): + self.metrics_dict[key] = value + + self.metrics_dict['is_healthy'] = health['is_healthy'] + self.metrics_dict['warnings'] = health['warnings'] + self.metrics_dict['watchdog_timeout'] = health['watchdog_timeout'] + + except Exception as e: + logging.debug(f"Failed to update metrics dict: {e}") + def _queue_log_entry(self) -> None: """Queue current state for CSV logging (non-blocking).""" try: diff --git a/src/RSIPI/rsi_client.py b/src/RSIPI/rsi_client.py index 9a8eb9e..5e98d0e 100644 --- a/src/RSIPI/rsi_client.py +++ b/src/RSIPI/rsi_client.py @@ -61,6 +61,9 @@ class RSIClient: # Shared logging state (readable from parent process) self._logging_active = multiprocessing.Value('b', False) + # Shared metrics dictionary (Phase 2) + self.metrics_dict = self.manager.dict() + # Create NetworkProcess but don't start communication yet self.network_process: NetworkProcess = NetworkProcess( network_settings["ip"], @@ -70,7 +73,8 @@ class RSIClient: self.stop_event, self.config_parser, self.start_event, - self.command_queue + self.command_queue, + self.metrics_dict ) # Share the logging_active flag self.network_process.logging_active = self._logging_active @@ -200,6 +204,9 @@ class RSIClient: self.start_event = multiprocessing.Event() self.command_queue = multiprocessing.Queue() + # Reset metrics dictionary (Phase 2) + self.metrics_dict.clear() + # Create new network process network_settings = self.config_parser.get_network_settings() self.network_process = NetworkProcess( @@ -210,7 +217,8 @@ class RSIClient: self.stop_event, self.config_parser, self.start_event, - self.command_queue + self.command_queue, + self.metrics_dict ) self.network_process.logging_active = self._logging_active self.network_process.start() diff --git a/src/RSIPI/timing_metrics.py b/src/RSIPI/timing_metrics.py new file mode 100644 index 0000000..b8a7de9 --- /dev/null +++ b/src/RSIPI/timing_metrics.py @@ -0,0 +1,304 @@ +""" +Timing instrumentation for RSI network communication. + +Tracks latency, jitter, cycle time, and network quality metrics for +diagnostic analysis and performance monitoring. +""" + +import time +import logging +from collections import deque +from typing import Dict, List, Optional, Tuple +from dataclasses import dataclass, field +import statistics + + +@dataclass +class TimingSnapshot: + """Single timing measurement snapshot.""" + timestamp: float + cycle_time: float + ipoc: int + packet_received: bool = True + + +@dataclass +class TimingMetrics: + """ + Real-time timing metrics for RSI communication. + + Tracks latency, jitter, cycle time, IPOC gaps, and packet loss + with configurable history window for statistical analysis. + """ + + # Configuration + history_size: int = 1000 # Number of samples to retain + expected_cycle_time: float = 0.004 # 4ms nominal cycle (250Hz) + + # Internal state + cycle_times: deque = field(default_factory=lambda: deque(maxlen=1000)) + timestamps: deque = field(default_factory=lambda: deque(maxlen=1000)) + ipoc_values: deque = field(default_factory=lambda: deque(maxlen=1000)) + + # Statistics + total_cycles: int = 0 + total_packets_lost: int = 0 + total_ipoc_gaps: int = 0 + + # Timing state + last_timestamp: Optional[float] = None + last_ipoc: Optional[int] = None + start_time: float = field(default_factory=time.time) + + # Watchdog + watchdog_timeout: float = 1.0 # 1 second + last_packet_time: float = field(default_factory=time.time) + + def __post_init__(self): + """Adjust deque maxlen to match history_size.""" + self.cycle_times = deque(maxlen=self.history_size) + self.timestamps = deque(maxlen=self.history_size) + self.ipoc_values = deque(maxlen=self.history_size) + + def record_cycle(self, ipoc: int) -> None: + """ + Record a successful communication cycle. + + Args: + ipoc: Current IPOC value from robot controller + """ + current_time = time.time() + + # Calculate cycle time + if self.last_timestamp is not None: + cycle_time = current_time - self.last_timestamp + self.cycle_times.append(cycle_time) + + # Check for IPOC gaps (missed packets) + if self.last_ipoc is not None: + expected_ipoc = self.last_ipoc + 4 # IPOC increments by 4 each cycle + ipoc_gap = ipoc - expected_ipoc + + if ipoc_gap != 0: + self.total_ipoc_gaps += 1 + packets_lost = ipoc_gap // 4 + self.total_packets_lost += packets_lost + logging.warning(f"IPOC gap detected: {ipoc_gap} (lost ~{packets_lost} packets)") + + # Record values + self.timestamps.append(current_time) + self.ipoc_values.append(ipoc) + self.last_timestamp = current_time + self.last_ipoc = ipoc + self.last_packet_time = current_time + self.total_cycles += 1 + + def check_watchdog(self) -> bool: + """ + Check if communication has timed out. + + Returns: + True if watchdog timeout exceeded, False otherwise + """ + if self.last_packet_time is None: + return False + + elapsed = time.time() - self.last_packet_time + return elapsed > self.watchdog_timeout + + def get_current_stats(self) -> Dict[str, float]: + """ + Get current timing statistics. + + Returns: + Dictionary with timing metrics: + - mean_cycle_time: Average cycle time in seconds + - std_cycle_time: Standard deviation of cycle time + - min_cycle_time: Minimum cycle time + - max_cycle_time: Maximum cycle time + - jitter: Cycle time standard deviation (alias) + - packet_loss_rate: Percentage of packets lost + - ipoc_gap_rate: IPOC gaps per 1000 cycles + - total_cycles: Total cycles recorded + - uptime: Total time since start in seconds + """ + if not self.cycle_times: + return { + "mean_cycle_time": 0.0, + "std_cycle_time": 0.0, + "min_cycle_time": 0.0, + "max_cycle_time": 0.0, + "jitter": 0.0, + "packet_loss_rate": 0.0, + "ipoc_gap_rate": 0.0, + "total_cycles": 0, + "uptime": time.time() - self.start_time, + } + + cycle_times_list = list(self.cycle_times) + mean_ct = statistics.mean(cycle_times_list) + std_ct = statistics.stdev(cycle_times_list) if len(cycle_times_list) > 1 else 0.0 + + packet_loss_rate = (self.total_packets_lost / max(self.total_cycles, 1)) * 100 + ipoc_gap_rate = (self.total_ipoc_gaps / max(self.total_cycles, 1)) * 1000 + + return { + "mean_cycle_time": mean_ct, + "std_cycle_time": std_ct, + "min_cycle_time": min(cycle_times_list), + "max_cycle_time": max(cycle_times_list), + "jitter": std_ct, # Jitter is typically measured as std deviation + "packet_loss_rate": packet_loss_rate, + "ipoc_gap_rate": ipoc_gap_rate, + "total_cycles": self.total_cycles, + "uptime": time.time() - self.start_time, + } + + def get_detailed_stats(self) -> Dict[str, any]: + """ + Get detailed statistics including percentiles. + + Returns: + Dictionary with detailed metrics including percentiles + """ + stats = self.get_current_stats() + + if self.cycle_times: + cycle_times_sorted = sorted(self.cycle_times) + n = len(cycle_times_sorted) + + stats.update({ + "p50_cycle_time": cycle_times_sorted[n // 2], + "p95_cycle_time": cycle_times_sorted[int(n * 0.95)], + "p99_cycle_time": cycle_times_sorted[int(n * 0.99)], + "samples": n, + }) + + return stats + + def get_health_status(self) -> Dict[str, any]: + """ + Get overall health status with warnings. + + Returns: + Dictionary with health indicators and warnings + """ + stats = self.get_current_stats() + watchdog_timeout = self.check_watchdog() + + warnings = [] + is_healthy = True + + # Check for watchdog timeout + if watchdog_timeout: + warnings.append("Communication timeout - no packets received") + is_healthy = False + + # Check for high jitter + if stats["jitter"] > 0.002: # 2ms jitter threshold + warnings.append(f"High jitter detected: {stats['jitter']*1000:.2f}ms") + is_healthy = False + + # Check for packet loss + if stats["packet_loss_rate"] > 1.0: # 1% packet loss threshold + warnings.append(f"Packet loss detected: {stats['packet_loss_rate']:.2f}%") + is_healthy = False + + # Check for high cycle time + if stats["mean_cycle_time"] > (self.expected_cycle_time * 1.5): + warnings.append(f"Cycle time exceeds expected: {stats['mean_cycle_time']*1000:.2f}ms") + is_healthy = False + + return { + "is_healthy": is_healthy, + "warnings": warnings, + "watchdog_timeout": watchdog_timeout, + "stats": stats, + } + + def reset(self) -> None: + """Reset all metrics to initial state.""" + self.cycle_times.clear() + self.timestamps.clear() + self.ipoc_values.clear() + self.total_cycles = 0 + self.total_packets_lost = 0 + self.total_ipoc_gaps = 0 + self.last_timestamp = None + self.last_ipoc = None + self.start_time = time.time() + self.last_packet_time = time.time() + logging.info("Timing metrics reset") + + +class NetworkQualityMonitor: + """ + High-level network quality monitoring. + + Provides easy access to network health and performance metrics + with automatic threshold-based alerting. + """ + + def __init__( + self, + metrics: TimingMetrics, + jitter_threshold: float = 0.002, + packet_loss_threshold: float = 1.0, + cycle_time_threshold: float = 0.006, + ): + """ + Initialize network quality monitor. + + Args: + metrics: TimingMetrics instance to monitor + jitter_threshold: Jitter threshold in seconds (default: 2ms) + packet_loss_threshold: Packet loss rate threshold % (default: 1%) + cycle_time_threshold: Cycle time threshold in seconds (default: 6ms) + """ + self.metrics = metrics + self.jitter_threshold = jitter_threshold + self.packet_loss_threshold = packet_loss_threshold + self.cycle_time_threshold = cycle_time_threshold + + def is_healthy(self) -> bool: + """ + Check if network is healthy. + + Returns: + True if all metrics within acceptable thresholds + """ + health = self.metrics.get_health_status() + return health["is_healthy"] + + def get_warnings(self) -> List[str]: + """ + Get current network warnings. + + Returns: + List of warning messages + """ + health = self.metrics.get_health_status() + return health["warnings"] + + def get_quality_score(self) -> float: + """ + Calculate overall network quality score (0-100). + + Returns: + Quality score where 100 is perfect, 0 is unusable + """ + stats = self.metrics.get_current_stats() + + if stats["total_cycles"] == 0: + return 0.0 + + # Score components (each 0-100) + jitter_score = max(0, 100 - (stats["jitter"] / self.jitter_threshold) * 100) + loss_score = max(0, 100 - (stats["packet_loss_rate"] / self.packet_loss_threshold) * 100) + cycle_score = max(0, 100 - ((stats["mean_cycle_time"] - self.metrics.expected_cycle_time) / + (self.cycle_time_threshold - self.metrics.expected_cycle_time)) * 100) + + # Weighted average + quality = (jitter_score * 0.4 + loss_score * 0.4 + cycle_score * 0.2) + + return min(100.0, max(0.0, quality))