Complete Phase 2: Auto-reconnection and stability testing

Implement automatic connection recovery and long-duration testing infrastructure
to complete Phase 2 (Network Reliability) of the RSIPI improvement roadmap.

New Features:
- Auto-reconnection manager with configurable retry strategies
  - IMMEDIATE: Reconnect without delay
  - LINEAR_BACKOFF: Incremental retry delays
  - EXPONENTIAL_BACKOFF: Exponential retry delays
- Background watchdog monitoring (checks every 2 seconds)
- Reconnection statistics tracking (attempts, failures, timestamps)
- Optional callbacks for reconnection events (success/failure)
- 24-hour stability test script with comprehensive reporting
  - Configurable test duration and sample intervals
  - Real-time health monitoring and progress logging
  - Detailed JSON reports with timing and network statistics
  - Human-readable summary with health percentage

Modified Files:
- src/RSIPI/rsi_client.py
  - Added auto-reconnect integration with enable_auto_reconnect parameter
  - Start/stop auto-reconnect monitor in lifecycle methods
  - Clear metrics on reconnection to reset statistics

New Files:
- src/RSIPI/auto_reconnect.py (241 lines)
  - AutoReconnectManager class with background monitoring thread
  - ReconnectStrategy enum for retry behavior configuration
  - Watchdog timeout detection and automatic recovery
  - Reconnection verification with health checks

- tests/stability_test.py (365 lines)
  - StabilityTest class for long-duration testing
  - Command-line interface with argparse
  - Automatic log file generation with timestamps
  - Sample collection with configurable intervals
  - Statistical analysis and reporting
  - Graceful interruption handling (KeyboardInterrupt)

Phase 2 Status:  COMPLETE
-  Timing instrumentation (commit 6e8ea2e)
-  Watchdog timer (commit 6e8ea2e)
-  Network quality monitoring (commit 6e8ea2e)
-  DiagnosticsAPI implementation (commit 6e8ea2e)
-  Auto-reconnection with graceful recovery (this commit)
-  24-hour stability test infrastructure (this commit)

Next: Run stability test, then proceed to Phase 3 (KRL Coordination)
This commit is contained in:
Adam 2026-01-17 00:12:44 +00:00
parent 6e8ea2e43f
commit bb65500082
3 changed files with 603 additions and 1 deletions

236
src/RSIPI/auto_reconnect.py Normal file
View File

@ -0,0 +1,236 @@
"""
Auto-reconnection manager for RSI network reliability.
Monitors network health and automatically reconnects when communication
is lost, with configurable retry logic and backoff strategies.
"""
import logging
import time
import threading
from typing import Optional, Callable, TYPE_CHECKING
from enum import Enum, auto
if TYPE_CHECKING:
from .rsi_client import RSIClient
class ReconnectStrategy(Enum):
"""Reconnection strategy options."""
IMMEDIATE = auto() # Reconnect immediately
LINEAR_BACKOFF = auto() # Increase delay linearly
EXPONENTIAL_BACKOFF = auto() # Double delay each retry
class AutoReconnectManager:
"""
Automatic reconnection manager for RSI communication.
Monitors network health via watchdog timer and automatically
attempts reconnection when communication is lost.
"""
def __init__(
self,
client: 'RSIClient',
enabled: bool = True,
check_interval: float = 2.0,
max_retries: int = 5,
retry_delay: float = 5.0,
strategy: ReconnectStrategy = ReconnectStrategy.LINEAR_BACKOFF,
on_reconnect: Optional[Callable] = None,
on_failure: Optional[Callable] = None,
):
"""
Initialize auto-reconnect manager.
Args:
client: RSIClient instance to monitor
enabled: Whether auto-reconnect is enabled
check_interval: How often to check health (seconds)
max_retries: Maximum reconnection attempts (0 = unlimited)
retry_delay: Base delay between retries (seconds)
strategy: Reconnection strategy (IMMEDIATE, LINEAR_BACKOFF, EXPONENTIAL_BACKOFF)
on_reconnect: Optional callback called after successful reconnect
on_failure: Optional callback called when max retries exceeded
"""
self.client = client
self.enabled = enabled
self.check_interval = check_interval
self.max_retries = max_retries
self.retry_delay = retry_delay
self.strategy = strategy
self.on_reconnect = on_reconnect
self.on_failure = on_failure
self._monitor_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._running = False
# Statistics
self.total_reconnects = 0
self.failed_reconnects = 0
self.last_reconnect_time: Optional[float] = None
def start(self) -> None:
"""Start the auto-reconnect monitor thread."""
if self._running:
logging.warning("Auto-reconnect manager already running")
return
if not self.enabled:
logging.info("Auto-reconnect is disabled")
return
self._stop_event.clear()
self._running = True
self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self._monitor_thread.start()
logging.info("Auto-reconnect manager started")
def stop(self) -> None:
"""Stop the auto-reconnect monitor thread."""
if not self._running:
return
self._running = False
self._stop_event.set()
if self._monitor_thread and self._monitor_thread.is_alive():
self._monitor_thread.join(timeout=5)
logging.info("Auto-reconnect manager stopped")
def _monitor_loop(self) -> None:
"""Main monitoring loop (runs in background thread)."""
while not self._stop_event.is_set():
try:
# Check if watchdog has timed out
if hasattr(self.client, 'metrics_dict'):
metrics = dict(self.client.metrics_dict)
watchdog_timeout = metrics.get('watchdog_timeout', False)
if watchdog_timeout and self.client.is_running():
logging.error("Watchdog timeout detected - initiating auto-reconnect")
self._attempt_reconnection()
except Exception as e:
logging.error(f"Error in auto-reconnect monitor: {e}")
# Sleep with interruptible wait
self._stop_event.wait(self.check_interval)
def _attempt_reconnection(self) -> bool:
"""
Attempt to reconnect with configured retry logic.
Returns:
True if reconnection successful, False otherwise
"""
retry_count = 0
current_delay = self.retry_delay
while True:
# Check if we've exceeded max retries
if self.max_retries > 0 and retry_count >= self.max_retries:
logging.error(f"Max reconnection retries ({self.max_retries}) exceeded")
self.failed_reconnects += 1
if self.on_failure:
try:
self.on_failure()
except Exception as e:
logging.error(f"Error in on_failure callback: {e}")
return False
retry_count += 1
logging.info(f"Reconnection attempt {retry_count}/{self.max_retries if self.max_retries > 0 else ''}")
try:
# Attempt reconnect
self.client.reconnect()
# Wait a moment for connection to stabilize
time.sleep(2)
# Verify connection is working
if self._verify_connection():
logging.info(f"✅ Reconnection successful after {retry_count} attempt(s)")
self.total_reconnects += 1
self.last_reconnect_time = time.time()
if self.on_reconnect:
try:
self.on_reconnect()
except Exception as e:
logging.error(f"Error in on_reconnect callback: {e}")
return True
else:
logging.warning("Reconnection completed but connection verification failed")
except Exception as e:
logging.error(f"Reconnection attempt {retry_count} failed: {e}")
# Calculate delay for next retry based on strategy
if self.strategy == ReconnectStrategy.IMMEDIATE:
delay = 0
elif self.strategy == ReconnectStrategy.LINEAR_BACKOFF:
delay = self.retry_delay * retry_count
elif self.strategy == ReconnectStrategy.EXPONENTIAL_BACKOFF:
delay = self.retry_delay * (2 ** (retry_count - 1))
else:
delay = self.retry_delay
if delay > 0:
logging.info(f"Waiting {delay:.1f}s before next reconnection attempt...")
self._stop_event.wait(delay)
# Check if we were stopped during the wait
if self._stop_event.is_set():
return False
def _verify_connection(self) -> bool:
"""
Verify that the connection is actually working.
Returns:
True if connection is healthy, False otherwise
"""
# Wait a moment for metrics to update
time.sleep(1)
if not hasattr(self.client, 'metrics_dict'):
return False
metrics = dict(self.client.metrics_dict)
# Check that we're receiving packets
total_cycles = metrics.get('total_cycles', 0)
if total_cycles == 0:
return False
# Check that watchdog is not timing out
watchdog_timeout = metrics.get('watchdog_timeout', True)
if watchdog_timeout:
return False
# Connection appears healthy
return True
def get_stats(self) -> dict:
"""
Get auto-reconnect statistics.
Returns:
Dictionary with reconnection statistics
"""
return {
'enabled': self.enabled,
'running': self._running,
'total_reconnects': self.total_reconnects,
'failed_reconnects': self.failed_reconnects,
'last_reconnect_time': self.last_reconnect_time,
'strategy': self.strategy.name,
'max_retries': self.max_retries,
'retry_delay': self.retry_delay,
}

View File

@ -8,6 +8,7 @@ from .config_parser import ConfigParser
from .network_handler import NetworkProcess
from .safety_manager import SafetyManager
from .exceptions import RSIStateError, RSIInvalidTransition, RSIClientNotReady
from .auto_reconnect import AutoReconnectManager, ReconnectStrategy
class ClientState(Enum):
@ -33,13 +34,23 @@ class RSIClient:
ClientState.ERROR: {ClientState.STOPPING, ClientState.INITIALIZED}, # Via reconnect
}
def __init__(self, config_file: str, rsi_limits_file: Optional[str] = None) -> None:
def __init__(
self,
config_file: str,
rsi_limits_file: Optional[str] = None,
enable_auto_reconnect: bool = False,
auto_reconnect_retries: int = 5,
auto_reconnect_delay: float = 5.0
) -> None:
"""
Initialize RSI client with configuration and safety limits.
Args:
config_file: Path to RSI_EthernetConfig.xml
rsi_limits_file: Optional path to .rsi.xml safety limits file
enable_auto_reconnect: Enable automatic reconnection on communication loss
auto_reconnect_retries: Maximum reconnection attempts (0 = unlimited)
auto_reconnect_delay: Base delay between retries in seconds
"""
logging.info(f"Loading RSI configuration from {config_file}...")
@ -84,6 +95,18 @@ class RSIClient:
self.running: bool = False
self.thread: Optional[Thread] = None
# Auto-reconnect manager (Phase 2)
self.auto_reconnect_manager: Optional[AutoReconnectManager] = None
if enable_auto_reconnect:
self.auto_reconnect_manager = AutoReconnectManager(
client=self,
enabled=True,
max_retries=auto_reconnect_retries,
retry_delay=auto_reconnect_delay,
strategy=ReconnectStrategy.LINEAR_BACKOFF
)
logging.info("Auto-reconnect enabled")
@property
def state(self) -> ClientState:
"""Get current client state (thread-safe)."""
@ -138,6 +161,10 @@ class RSIClient:
self.running = True
logging.info("RSI Client Started")
# Start auto-reconnect monitor (Phase 2)
if self.auto_reconnect_manager:
self.auto_reconnect_manager.start()
try:
while self.running and not self.stop_event.is_set():
time.sleep(2)
@ -174,6 +201,10 @@ class RSIClient:
self.thread.join(timeout=2)
self.thread = None
# Stop auto-reconnect monitor (Phase 2)
if self.auto_reconnect_manager:
self.auto_reconnect_manager.stop()
self._transition_to(ClientState.STOPPED)
logging.info("RSI Client Stopped")

335
tests/stability_test.py Normal file
View File

@ -0,0 +1,335 @@
"""
24-Hour RSI Stability Test
Long-duration stability test for RSIPI network communication.
Monitors connection health, tracks metrics, and generates detailed
performance reports.
Usage:
python stability_test.py [--duration HOURS] [--config CONFIG_FILE] [--output OUTPUT_FILE]
Example:
# Run for 24 hours
python stability_test.py --duration 24
# Run for 1 hour with custom config
python stability_test.py --duration 1 --config custom_config.xml
# Quick 5-minute test
python stability_test.py --duration 0.083 # 5 minutes
"""
import sys
import os
import time
import argparse
import logging
import json
import datetime
from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / 'src'))
from RSIPI import RSIAPI
class StabilityTest:
"""Long-duration stability test for RSI communication."""
def __init__(
self,
config_file: str,
duration_hours: float,
output_file: str,
check_interval: float = 60.0
):
"""
Initialize stability test.
Args:
config_file: Path to RSI config file
duration_hours: Test duration in hours
output_file: Path for results JSON file
check_interval: How often to sample metrics (seconds)
"""
self.config_file = config_file
self.duration_hours = duration_hours
self.output_file = output_file
self.check_interval = check_interval
self.start_time = None
self.end_time = None
self.samples = []
self.api = None
def setup(self) -> None:
"""Set up logging and RSI connection."""
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'stability_test_{datetime.datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
logging.StreamHandler()
]
)
logging.info(f"=== RSI Stability Test ===")
logging.info(f"Config: {self.config_file}")
logging.info(f"Duration: {self.duration_hours} hours")
logging.info(f"Check interval: {self.check_interval}s")
logging.info(f"Output: {self.output_file}")
logging.info("=" * 50)
# Initialize API with auto-reconnect enabled
self.api = RSIAPI(
self.config_file,
enable_auto_reconnect=True,
auto_reconnect_retries=0, # Unlimited retries
auto_reconnect_delay=10.0
)
logging.info("Starting RSI communication...")
self.api.start()
# Wait for connection to stabilize
time.sleep(3)
if not self.api.is_running():
raise RuntimeError("Failed to start RSI communication")
logging.info("✅ RSI communication started successfully")
def run(self) -> None:
"""Run the stability test."""
self.start_time = time.time()
end_time = self.start_time + (self.duration_hours * 3600)
sample_count = 0
error_count = 0
logging.info(f"Test started at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logging.info(f"Will run until {datetime.datetime.fromtimestamp(end_time).strftime('%Y-%m-%d %H:%M:%S')}")
try:
while time.time() < end_time:
try:
# Collect metrics sample
sample = self._collect_sample()
self.samples.append(sample)
sample_count += 1
# Log progress
elapsed_hours = (time.time() - self.start_time) / 3600
remaining_hours = self.duration_hours - elapsed_hours
progress = (elapsed_hours / self.duration_hours) * 100
self._log_progress(sample, elapsed_hours, remaining_hours, progress, sample_count, error_count)
except Exception as e:
error_count += 1
logging.error(f"Error collecting sample: {e}")
# Sleep until next check
time.sleep(self.check_interval)
except KeyboardInterrupt:
logging.warning("\n⚠️ Test interrupted by user")
finally:
self.end_time = time.time()
self._cleanup()
def _collect_sample(self) -> dict:
"""Collect a single metrics sample."""
stats = self.api.diagnostics.get_stats()
sample = {
'timestamp': time.time(),
'mean_cycle_time': stats.get('mean_cycle_time', 0),
'jitter': stats.get('jitter', 0),
'packet_loss_rate': stats.get('packet_loss_rate', 0),
'ipoc_gap_rate': stats.get('ipoc_gap_rate', 0),
'total_cycles': stats.get('total_cycles', 0),
'is_healthy': stats.get('is_healthy', False),
'warnings': stats.get('warnings', []),
'uptime': stats.get('uptime', 0),
}
return sample
def _log_progress(
self,
sample: dict,
elapsed_hours: float,
remaining_hours: float,
progress: float,
sample_count: int,
error_count: int
) -> None:
"""Log current progress."""
health_icon = "" if sample['is_healthy'] else "⚠️"
logging.info(
f"{health_icon} Progress: {progress:.1f}% | "
f"Elapsed: {elapsed_hours:.2f}h | "
f"Remaining: {remaining_hours:.2f}h | "
f"Samples: {sample_count} | "
f"Jitter: {sample['jitter']*1000:.2f}ms | "
f"Loss: {sample['packet_loss_rate']:.2f}%"
)
if sample['warnings']:
for warning in sample['warnings']:
logging.warning(f" ⚠️ {warning}")
def _cleanup(self) -> None:
"""Clean up and generate report."""
logging.info("\n=== Test Complete ===")
# Stop RSI
logging.info("Stopping RSI communication...")
self.api.stop()
# Generate report
logging.info("Generating report...")
report = self._generate_report()
# Save results
with open(self.output_file, 'w') as f:
json.dump(report, f, indent=2)
logging.info(f"✅ Report saved to: {self.output_file}")
# Print summary
self._print_summary(report)
def _generate_report(self) -> dict:
"""Generate comprehensive test report."""
if not self.samples:
return {'error': 'No samples collected'}
# Calculate statistics
jitter_values = [s['jitter'] for s in self.samples]
packet_loss_values = [s['packet_loss_rate'] for s in self.samples]
cycle_time_values = [s['mean_cycle_time'] for s in self.samples]
healthy_samples = sum(1 for s in self.samples if s['is_healthy'])
unhealthy_samples = len(self.samples) - healthy_samples
report = {
'test_info': {
'config_file': self.config_file,
'duration_hours': self.duration_hours,
'start_time': datetime.datetime.fromtimestamp(self.start_time).isoformat(),
'end_time': datetime.datetime.fromtimestamp(self.end_time).isoformat(),
'actual_duration_hours': (self.end_time - self.start_time) / 3600,
'total_samples': len(self.samples),
},
'health_summary': {
'healthy_samples': healthy_samples,
'unhealthy_samples': unhealthy_samples,
'health_percentage': (healthy_samples / len(self.samples)) * 100,
},
'timing_stats': {
'mean_cycle_time_ms': statistics.mean(cycle_time_values) * 1000 if cycle_time_values else 0,
'min_cycle_time_ms': min(cycle_time_values) * 1000 if cycle_time_values else 0,
'max_cycle_time_ms': max(cycle_time_values) * 1000 if cycle_time_values else 0,
'mean_jitter_ms': statistics.mean(jitter_values) * 1000 if jitter_values else 0,
'max_jitter_ms': max(jitter_values) * 1000 if jitter_values else 0,
},
'network_stats': {
'mean_packet_loss_percent': statistics.mean(packet_loss_values) if packet_loss_values else 0,
'max_packet_loss_percent': max(packet_loss_values) if packet_loss_values else 0,
},
'final_metrics': self.samples[-1] if self.samples else {},
}
return report
def _print_summary(self, report: dict) -> None:
"""Print human-readable summary."""
print("\n" + "=" * 60)
print("STABILITY TEST SUMMARY")
print("=" * 60)
info = report['test_info']
health = report['health_summary']
timing = report['timing_stats']
network = report['network_stats']
print(f"\nTest Duration: {info['actual_duration_hours']:.2f} hours")
print(f"Total Samples: {info['total_samples']}")
print(f"\nHealth: {health['health_percentage']:.1f}% healthy")
print(f" Healthy samples: {health['healthy_samples']}")
print(f" Unhealthy samples: {health['unhealthy_samples']}")
print(f"\nTiming Performance:")
print(f" Mean cycle time: {timing['mean_cycle_time_ms']:.2f}ms")
print(f" Cycle time range: {timing['min_cycle_time_ms']:.2f} - {timing['max_cycle_time_ms']:.2f}ms")
print(f" Mean jitter: {timing['mean_jitter_ms']:.2f}ms")
print(f" Max jitter: {timing['max_jitter_ms']:.2f}ms")
print(f"\nNetwork Quality:")
print(f" Mean packet loss: {network['mean_packet_loss_percent']:.3f}%")
print(f" Max packet loss: {network['max_packet_loss_percent']:.3f}%")
health_icon = "✅ PASS" if health['health_percentage'] >= 95 else "⚠️ NEEDS IMPROVEMENT"
print(f"\nOverall Result: {health_icon}")
print("=" * 60)
import statistics
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description='RSI 24-Hour Stability Test')
parser.add_argument(
'--duration',
type=float,
default=24.0,
help='Test duration in hours (default: 24)'
)
parser.add_argument(
'--config',
type=str,
default='RSI_EthernetConfig.xml',
help='Path to RSI config file'
)
parser.add_argument(
'--output',
type=str,
default=None,
help='Output JSON file (default: stability_test_TIMESTAMP.json)'
)
parser.add_argument(
'--interval',
type=float,
default=60.0,
help='Check interval in seconds (default: 60)'
)
args = parser.parse_args()
# Generate default output filename if not specified
if args.output is None:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
args.output = f'stability_test_{timestamp}.json'
# Run test
test = StabilityTest(
config_file=args.config,
duration_hours=args.duration,
output_file=args.output,
check_interval=args.interval
)
test.setup()
test.run()
if __name__ == '__main__':
main()