diff --git a/src/RSIPI/auto_reconnect.py b/src/RSIPI/auto_reconnect.py new file mode 100644 index 0000000..12b35fd --- /dev/null +++ b/src/RSIPI/auto_reconnect.py @@ -0,0 +1,236 @@ +""" +Auto-reconnection manager for RSI network reliability. + +Monitors network health and automatically reconnects when communication +is lost, with configurable retry logic and backoff strategies. +""" + +import logging +import time +import threading +from typing import Optional, Callable, TYPE_CHECKING +from enum import Enum, auto + +if TYPE_CHECKING: + from .rsi_client import RSIClient + + +class ReconnectStrategy(Enum): + """Reconnection strategy options.""" + IMMEDIATE = auto() # Reconnect immediately + LINEAR_BACKOFF = auto() # Increase delay linearly + EXPONENTIAL_BACKOFF = auto() # Double delay each retry + + +class AutoReconnectManager: + """ + Automatic reconnection manager for RSI communication. + + Monitors network health via watchdog timer and automatically + attempts reconnection when communication is lost. + """ + + def __init__( + self, + client: 'RSIClient', + enabled: bool = True, + check_interval: float = 2.0, + max_retries: int = 5, + retry_delay: float = 5.0, + strategy: ReconnectStrategy = ReconnectStrategy.LINEAR_BACKOFF, + on_reconnect: Optional[Callable] = None, + on_failure: Optional[Callable] = None, + ): + """ + Initialize auto-reconnect manager. + + Args: + client: RSIClient instance to monitor + enabled: Whether auto-reconnect is enabled + check_interval: How often to check health (seconds) + max_retries: Maximum reconnection attempts (0 = unlimited) + retry_delay: Base delay between retries (seconds) + strategy: Reconnection strategy (IMMEDIATE, LINEAR_BACKOFF, EXPONENTIAL_BACKOFF) + on_reconnect: Optional callback called after successful reconnect + on_failure: Optional callback called when max retries exceeded + """ + self.client = client + self.enabled = enabled + self.check_interval = check_interval + self.max_retries = max_retries + self.retry_delay = retry_delay + self.strategy = strategy + self.on_reconnect = on_reconnect + self.on_failure = on_failure + + self._monitor_thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._running = False + + # Statistics + self.total_reconnects = 0 + self.failed_reconnects = 0 + self.last_reconnect_time: Optional[float] = None + + def start(self) -> None: + """Start the auto-reconnect monitor thread.""" + if self._running: + logging.warning("Auto-reconnect manager already running") + return + + if not self.enabled: + logging.info("Auto-reconnect is disabled") + return + + self._stop_event.clear() + self._running = True + self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) + self._monitor_thread.start() + logging.info("Auto-reconnect manager started") + + def stop(self) -> None: + """Stop the auto-reconnect monitor thread.""" + if not self._running: + return + + self._running = False + self._stop_event.set() + + if self._monitor_thread and self._monitor_thread.is_alive(): + self._monitor_thread.join(timeout=5) + + logging.info("Auto-reconnect manager stopped") + + def _monitor_loop(self) -> None: + """Main monitoring loop (runs in background thread).""" + while not self._stop_event.is_set(): + try: + # Check if watchdog has timed out + if hasattr(self.client, 'metrics_dict'): + metrics = dict(self.client.metrics_dict) + watchdog_timeout = metrics.get('watchdog_timeout', False) + + if watchdog_timeout and self.client.is_running(): + logging.error("Watchdog timeout detected - initiating auto-reconnect") + self._attempt_reconnection() + + except Exception as e: + logging.error(f"Error in auto-reconnect monitor: {e}") + + # Sleep with interruptible wait + self._stop_event.wait(self.check_interval) + + def _attempt_reconnection(self) -> bool: + """ + Attempt to reconnect with configured retry logic. + + Returns: + True if reconnection successful, False otherwise + """ + retry_count = 0 + current_delay = self.retry_delay + + while True: + # Check if we've exceeded max retries + if self.max_retries > 0 and retry_count >= self.max_retries: + logging.error(f"Max reconnection retries ({self.max_retries}) exceeded") + self.failed_reconnects += 1 + if self.on_failure: + try: + self.on_failure() + except Exception as e: + logging.error(f"Error in on_failure callback: {e}") + return False + + retry_count += 1 + logging.info(f"Reconnection attempt {retry_count}/{self.max_retries if self.max_retries > 0 else '∞'}") + + try: + # Attempt reconnect + self.client.reconnect() + + # Wait a moment for connection to stabilize + time.sleep(2) + + # Verify connection is working + if self._verify_connection(): + logging.info(f"✅ Reconnection successful after {retry_count} attempt(s)") + self.total_reconnects += 1 + self.last_reconnect_time = time.time() + + if self.on_reconnect: + try: + self.on_reconnect() + except Exception as e: + logging.error(f"Error in on_reconnect callback: {e}") + + return True + else: + logging.warning("Reconnection completed but connection verification failed") + + except Exception as e: + logging.error(f"Reconnection attempt {retry_count} failed: {e}") + + # Calculate delay for next retry based on strategy + if self.strategy == ReconnectStrategy.IMMEDIATE: + delay = 0 + elif self.strategy == ReconnectStrategy.LINEAR_BACKOFF: + delay = self.retry_delay * retry_count + elif self.strategy == ReconnectStrategy.EXPONENTIAL_BACKOFF: + delay = self.retry_delay * (2 ** (retry_count - 1)) + else: + delay = self.retry_delay + + if delay > 0: + logging.info(f"Waiting {delay:.1f}s before next reconnection attempt...") + self._stop_event.wait(delay) + + # Check if we were stopped during the wait + if self._stop_event.is_set(): + return False + + def _verify_connection(self) -> bool: + """ + Verify that the connection is actually working. + + Returns: + True if connection is healthy, False otherwise + """ + # Wait a moment for metrics to update + time.sleep(1) + + if not hasattr(self.client, 'metrics_dict'): + return False + + metrics = dict(self.client.metrics_dict) + + # Check that we're receiving packets + total_cycles = metrics.get('total_cycles', 0) + if total_cycles == 0: + return False + + # Check that watchdog is not timing out + watchdog_timeout = metrics.get('watchdog_timeout', True) + if watchdog_timeout: + return False + + # Connection appears healthy + return True + + def get_stats(self) -> dict: + """ + Get auto-reconnect statistics. + + Returns: + Dictionary with reconnection statistics + """ + return { + 'enabled': self.enabled, + 'running': self._running, + 'total_reconnects': self.total_reconnects, + 'failed_reconnects': self.failed_reconnects, + 'last_reconnect_time': self.last_reconnect_time, + 'strategy': self.strategy.name, + 'max_retries': self.max_retries, + 'retry_delay': self.retry_delay, + } diff --git a/src/RSIPI/rsi_client.py b/src/RSIPI/rsi_client.py index 5e98d0e..d9f6623 100644 --- a/src/RSIPI/rsi_client.py +++ b/src/RSIPI/rsi_client.py @@ -8,6 +8,7 @@ from .config_parser import ConfigParser from .network_handler import NetworkProcess from .safety_manager import SafetyManager from .exceptions import RSIStateError, RSIInvalidTransition, RSIClientNotReady +from .auto_reconnect import AutoReconnectManager, ReconnectStrategy class ClientState(Enum): @@ -33,13 +34,23 @@ class RSIClient: ClientState.ERROR: {ClientState.STOPPING, ClientState.INITIALIZED}, # Via reconnect } - def __init__(self, config_file: str, rsi_limits_file: Optional[str] = None) -> None: + def __init__( + self, + config_file: str, + rsi_limits_file: Optional[str] = None, + enable_auto_reconnect: bool = False, + auto_reconnect_retries: int = 5, + auto_reconnect_delay: float = 5.0 + ) -> None: """ Initialize RSI client with configuration and safety limits. Args: config_file: Path to RSI_EthernetConfig.xml rsi_limits_file: Optional path to .rsi.xml safety limits file + enable_auto_reconnect: Enable automatic reconnection on communication loss + auto_reconnect_retries: Maximum reconnection attempts (0 = unlimited) + auto_reconnect_delay: Base delay between retries in seconds """ logging.info(f"Loading RSI configuration from {config_file}...") @@ -84,6 +95,18 @@ class RSIClient: self.running: bool = False self.thread: Optional[Thread] = None + # Auto-reconnect manager (Phase 2) + self.auto_reconnect_manager: Optional[AutoReconnectManager] = None + if enable_auto_reconnect: + self.auto_reconnect_manager = AutoReconnectManager( + client=self, + enabled=True, + max_retries=auto_reconnect_retries, + retry_delay=auto_reconnect_delay, + strategy=ReconnectStrategy.LINEAR_BACKOFF + ) + logging.info("Auto-reconnect enabled") + @property def state(self) -> ClientState: """Get current client state (thread-safe).""" @@ -138,6 +161,10 @@ class RSIClient: self.running = True logging.info("RSI Client Started") + # Start auto-reconnect monitor (Phase 2) + if self.auto_reconnect_manager: + self.auto_reconnect_manager.start() + try: while self.running and not self.stop_event.is_set(): time.sleep(2) @@ -174,6 +201,10 @@ class RSIClient: self.thread.join(timeout=2) self.thread = None + # Stop auto-reconnect monitor (Phase 2) + if self.auto_reconnect_manager: + self.auto_reconnect_manager.stop() + self._transition_to(ClientState.STOPPED) logging.info("RSI Client Stopped") diff --git a/tests/stability_test.py b/tests/stability_test.py new file mode 100644 index 0000000..1b3dcac --- /dev/null +++ b/tests/stability_test.py @@ -0,0 +1,335 @@ +""" +24-Hour RSI Stability Test + +Long-duration stability test for RSIPI network communication. +Monitors connection health, tracks metrics, and generates detailed +performance reports. + +Usage: + python stability_test.py [--duration HOURS] [--config CONFIG_FILE] [--output OUTPUT_FILE] + +Example: + # Run for 24 hours + python stability_test.py --duration 24 + + # Run for 1 hour with custom config + python stability_test.py --duration 1 --config custom_config.xml + + # Quick 5-minute test + python stability_test.py --duration 0.083 # 5 minutes +""" + +import sys +import os +import time +import argparse +import logging +import json +import datetime +from pathlib import Path + +# Add parent directory to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent / 'src')) + +from RSIPI import RSIAPI + + +class StabilityTest: + """Long-duration stability test for RSI communication.""" + + def __init__( + self, + config_file: str, + duration_hours: float, + output_file: str, + check_interval: float = 60.0 + ): + """ + Initialize stability test. + + Args: + config_file: Path to RSI config file + duration_hours: Test duration in hours + output_file: Path for results JSON file + check_interval: How often to sample metrics (seconds) + """ + self.config_file = config_file + self.duration_hours = duration_hours + self.output_file = output_file + self.check_interval = check_interval + + self.start_time = None + self.end_time = None + self.samples = [] + self.api = None + + def setup(self) -> None: + """Set up logging and RSI connection.""" + # Configure logging + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(f'stability_test_{datetime.datetime.now().strftime("%Y%m%d_%H%M%S")}.log'), + logging.StreamHandler() + ] + ) + + logging.info(f"=== RSI Stability Test ===") + logging.info(f"Config: {self.config_file}") + logging.info(f"Duration: {self.duration_hours} hours") + logging.info(f"Check interval: {self.check_interval}s") + logging.info(f"Output: {self.output_file}") + logging.info("=" * 50) + + # Initialize API with auto-reconnect enabled + self.api = RSIAPI( + self.config_file, + enable_auto_reconnect=True, + auto_reconnect_retries=0, # Unlimited retries + auto_reconnect_delay=10.0 + ) + + logging.info("Starting RSI communication...") + self.api.start() + + # Wait for connection to stabilize + time.sleep(3) + + if not self.api.is_running(): + raise RuntimeError("Failed to start RSI communication") + + logging.info("✅ RSI communication started successfully") + + def run(self) -> None: + """Run the stability test.""" + self.start_time = time.time() + end_time = self.start_time + (self.duration_hours * 3600) + + sample_count = 0 + error_count = 0 + + logging.info(f"Test started at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + logging.info(f"Will run until {datetime.datetime.fromtimestamp(end_time).strftime('%Y-%m-%d %H:%M:%S')}") + + try: + while time.time() < end_time: + try: + # Collect metrics sample + sample = self._collect_sample() + self.samples.append(sample) + sample_count += 1 + + # Log progress + elapsed_hours = (time.time() - self.start_time) / 3600 + remaining_hours = self.duration_hours - elapsed_hours + progress = (elapsed_hours / self.duration_hours) * 100 + + self._log_progress(sample, elapsed_hours, remaining_hours, progress, sample_count, error_count) + + except Exception as e: + error_count += 1 + logging.error(f"Error collecting sample: {e}") + + # Sleep until next check + time.sleep(self.check_interval) + + except KeyboardInterrupt: + logging.warning("\n⚠️ Test interrupted by user") + + finally: + self.end_time = time.time() + self._cleanup() + + def _collect_sample(self) -> dict: + """Collect a single metrics sample.""" + stats = self.api.diagnostics.get_stats() + + sample = { + 'timestamp': time.time(), + 'mean_cycle_time': stats.get('mean_cycle_time', 0), + 'jitter': stats.get('jitter', 0), + 'packet_loss_rate': stats.get('packet_loss_rate', 0), + 'ipoc_gap_rate': stats.get('ipoc_gap_rate', 0), + 'total_cycles': stats.get('total_cycles', 0), + 'is_healthy': stats.get('is_healthy', False), + 'warnings': stats.get('warnings', []), + 'uptime': stats.get('uptime', 0), + } + + return sample + + def _log_progress( + self, + sample: dict, + elapsed_hours: float, + remaining_hours: float, + progress: float, + sample_count: int, + error_count: int + ) -> None: + """Log current progress.""" + health_icon = "✅" if sample['is_healthy'] else "⚠️" + + logging.info( + f"{health_icon} Progress: {progress:.1f}% | " + f"Elapsed: {elapsed_hours:.2f}h | " + f"Remaining: {remaining_hours:.2f}h | " + f"Samples: {sample_count} | " + f"Jitter: {sample['jitter']*1000:.2f}ms | " + f"Loss: {sample['packet_loss_rate']:.2f}%" + ) + + if sample['warnings']: + for warning in sample['warnings']: + logging.warning(f" ⚠️ {warning}") + + def _cleanup(self) -> None: + """Clean up and generate report.""" + logging.info("\n=== Test Complete ===") + + # Stop RSI + logging.info("Stopping RSI communication...") + self.api.stop() + + # Generate report + logging.info("Generating report...") + report = self._generate_report() + + # Save results + with open(self.output_file, 'w') as f: + json.dump(report, f, indent=2) + + logging.info(f"✅ Report saved to: {self.output_file}") + + # Print summary + self._print_summary(report) + + def _generate_report(self) -> dict: + """Generate comprehensive test report.""" + if not self.samples: + return {'error': 'No samples collected'} + + # Calculate statistics + jitter_values = [s['jitter'] for s in self.samples] + packet_loss_values = [s['packet_loss_rate'] for s in self.samples] + cycle_time_values = [s['mean_cycle_time'] for s in self.samples] + + healthy_samples = sum(1 for s in self.samples if s['is_healthy']) + unhealthy_samples = len(self.samples) - healthy_samples + + report = { + 'test_info': { + 'config_file': self.config_file, + 'duration_hours': self.duration_hours, + 'start_time': datetime.datetime.fromtimestamp(self.start_time).isoformat(), + 'end_time': datetime.datetime.fromtimestamp(self.end_time).isoformat(), + 'actual_duration_hours': (self.end_time - self.start_time) / 3600, + 'total_samples': len(self.samples), + }, + 'health_summary': { + 'healthy_samples': healthy_samples, + 'unhealthy_samples': unhealthy_samples, + 'health_percentage': (healthy_samples / len(self.samples)) * 100, + }, + 'timing_stats': { + 'mean_cycle_time_ms': statistics.mean(cycle_time_values) * 1000 if cycle_time_values else 0, + 'min_cycle_time_ms': min(cycle_time_values) * 1000 if cycle_time_values else 0, + 'max_cycle_time_ms': max(cycle_time_values) * 1000 if cycle_time_values else 0, + 'mean_jitter_ms': statistics.mean(jitter_values) * 1000 if jitter_values else 0, + 'max_jitter_ms': max(jitter_values) * 1000 if jitter_values else 0, + }, + 'network_stats': { + 'mean_packet_loss_percent': statistics.mean(packet_loss_values) if packet_loss_values else 0, + 'max_packet_loss_percent': max(packet_loss_values) if packet_loss_values else 0, + }, + 'final_metrics': self.samples[-1] if self.samples else {}, + } + + return report + + def _print_summary(self, report: dict) -> None: + """Print human-readable summary.""" + print("\n" + "=" * 60) + print("STABILITY TEST SUMMARY") + print("=" * 60) + + info = report['test_info'] + health = report['health_summary'] + timing = report['timing_stats'] + network = report['network_stats'] + + print(f"\nTest Duration: {info['actual_duration_hours']:.2f} hours") + print(f"Total Samples: {info['total_samples']}") + + print(f"\nHealth: {health['health_percentage']:.1f}% healthy") + print(f" Healthy samples: {health['healthy_samples']}") + print(f" Unhealthy samples: {health['unhealthy_samples']}") + + print(f"\nTiming Performance:") + print(f" Mean cycle time: {timing['mean_cycle_time_ms']:.2f}ms") + print(f" Cycle time range: {timing['min_cycle_time_ms']:.2f} - {timing['max_cycle_time_ms']:.2f}ms") + print(f" Mean jitter: {timing['mean_jitter_ms']:.2f}ms") + print(f" Max jitter: {timing['max_jitter_ms']:.2f}ms") + + print(f"\nNetwork Quality:") + print(f" Mean packet loss: {network['mean_packet_loss_percent']:.3f}%") + print(f" Max packet loss: {network['max_packet_loss_percent']:.3f}%") + + health_icon = "✅ PASS" if health['health_percentage'] >= 95 else "⚠️ NEEDS IMPROVEMENT" + print(f"\nOverall Result: {health_icon}") + print("=" * 60) + + +import statistics + + +def main(): + """Main entry point.""" + parser = argparse.ArgumentParser(description='RSI 24-Hour Stability Test') + parser.add_argument( + '--duration', + type=float, + default=24.0, + help='Test duration in hours (default: 24)' + ) + parser.add_argument( + '--config', + type=str, + default='RSI_EthernetConfig.xml', + help='Path to RSI config file' + ) + parser.add_argument( + '--output', + type=str, + default=None, + help='Output JSON file (default: stability_test_TIMESTAMP.json)' + ) + parser.add_argument( + '--interval', + type=float, + default=60.0, + help='Check interval in seconds (default: 60)' + ) + + args = parser.parse_args() + + # Generate default output filename if not specified + if args.output is None: + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + args.output = f'stability_test_{timestamp}.json' + + # Run test + test = StabilityTest( + config_file=args.config, + duration_hours=args.duration, + output_file=args.output, + check_interval=args.interval + ) + + test.setup() + test.run() + + +if __name__ == '__main__': + main()