Compare commits

...

2 Commits

12 changed files with 448 additions and 626 deletions

View File

@ -1,3 +1,4 @@
import logging
import xml.etree.ElementTree as ET
class ConfigParser:
@ -70,9 +71,6 @@ class ConfigParser:
print(f"[WARNING] Failed to load .rsi.xml safety limits: {e}")
self.safety_limits = {}
print(f"✅ Final Send Variables: {self.send_variables}")
print(f"✅ Final Receive Variables: {self.receive_variables}")
def process_config(self):
"""
Parses the RSI config file and builds the send/receive variable dictionaries.
@ -90,7 +88,7 @@ class ConfigParser:
# Extract <CONFIG> section for IP/port/etc.
config = root.find("CONFIG")
if config is None:
raise ValueError("Missing <CONFIG> section in RSI_EthernetConfig.xml")
raise ValueError("Missing <CONFIG> section in RSI_EthernetConfig.xml")
self.network_settings = {
"ip": config.find("IP_NUMBER").text.strip() if config.find("IP_NUMBER") is not None else None,
@ -102,7 +100,7 @@ class ConfigParser:
print(f"✅ Loaded network settings: {self.network_settings}")
if None in self.network_settings.values():
raise ValueError("Missing one or more required network settings (ip, port, sentype, onlysend)")
raise ValueError("Missing one or more required network settings (ip, port, sentype, onlysend)")
# Parse SEND section
send_section = root.find("SEND/ELEMENTS")
@ -123,7 +121,7 @@ class ConfigParser:
return send_vars, receive_vars
except Exception as e:
print(f"Error processing config file: {e}")
logging.error(f"Error processing config file: {e}")
return {}, {}
def process_variable_structure(self, var_dict, tag, var_type, indx=""):
@ -138,24 +136,19 @@ class ConfigParser:
"""
tag = tag.replace("DEF_", "") # Remove DEF_ prefix if present
print(f"🔍 Assigning {tag}: INDX={indx}, TYPE={var_type}")
if tag in self.internal_structure:
# If pre-defined internally, copy structure
internal_value = self.internal_structure[tag]
var_dict[tag] = internal_value.copy() if isinstance(internal_value, dict) else internal_value
print(f"✅ INTERNAL Match: {tag} -> {var_dict[tag]}")
elif "." in tag:
# Handle nested dictionary e.g. Tech.T21 -> { 'Tech': { 'T21': 0.0 } }
parent, subkey = tag.split(".", 1)
if parent not in var_dict:
var_dict[parent] = {}
var_dict[parent][subkey] = self.get_default_value(var_type)
print(f"📂 Assigned '{tag}' as nested dictionary under '{parent}': {var_dict[parent]}")
else:
# Standard single-value variable
var_dict[tag] = self.get_default_value(var_type)
print(f"📄 Assigned Standard Value: '{tag}' -> {var_dict[tag]}")
@staticmethod
def rename_tech_keys(var_dict):
@ -171,7 +164,6 @@ class ConfigParser:
tech_data.update(var_dict.pop(key))
if tech_data:
var_dict["Tech"] = tech_data
print(f"✅ Renamed Tech.XX keys to 'Tech': {var_dict['Tech']}")
@staticmethod
def get_default_value(var_type):

View File

@ -1,4 +1,5 @@
import csv
import logging
import re
from collections import OrderedDict
@ -28,8 +29,6 @@ class KRLParser:
if label not in self.labels_to_extract:
self.labels_to_extract.append(label)
print(f"📌 Found labels in .src: {self.labels_to_extract}")
def parse_dat(self):
"""
Parses the .dat file and retrieves Cartesian coordinates for each label.
@ -58,8 +57,6 @@ class KRLParser:
self.positions[label] = coords
print(f"📥 Parsed {len(self.positions)} positions from .dat")
def export_csv(self, output_file):
"""
Writes the extracted Cartesian positions into a structured CSV file,
@ -90,9 +87,9 @@ class KRLParser:
})
sequence_number += 1
else:
print(f"⚠️ Skipped missing/deleted point: {label}")
logging.warning(f"Skipped missing/deleted point: {label}")
print(f"CSV exported successfully to {output_file} with {sequence_number} points.")
logging.info(f"CSV exported successfully to {output_file} with {sequence_number} points.")
# Optional CLI usage

View File

@ -152,9 +152,9 @@ if __name__ == "__main__":
if args.limits:
from src.RSIPI.rsi_limit_parser import parse_rsi_limits
limits = parse_rsi_limits(args.limits)
visualiser = Kukarsivisualiser(args.csv_file, safety_limits=limits)
visualiser = KukaRSIVisualiser(args.csv_file, safety_limits=limits)
else:
visualiser = Kukarsivisualiser(args.csv_file)
visualiser = KukaRSIVisualiser(args.csv_file)
visualiser.plot_trajectory()
visualiser.plot_joint_positions()

130
src/RSIPI/live_plotter.py Normal file
View File

@ -0,0 +1,130 @@
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from collections import deque
from threading import Thread, Lock
import time
class LivePlotter:
def __init__(self, client, mode="3d", interval=100):
self.client = client
self.mode = mode
self.interval = interval
self.running = False
# Plot data buffers
self.time_data = deque(maxlen=500)
self.position_data = {k: deque(maxlen=500) for k in ["X", "Y", "Z"]}
self.velocity_data = {k: deque(maxlen=500) for k in ["X", "Y", "Z"]}
self.acceleration_data = {k: deque(maxlen=500) for k in ["X", "Y", "Z"]}
self.joint_data = {f"A{i}": deque(maxlen=500) for i in range(1, 7)}
self.force_data = {f"A{i}": deque(maxlen=500) for i in range(1, 7)}
self.previous_positions = {"X": 0, "Y": 0, "Z": 0}
self.previous_velocities = {"X": 0, "Y": 0, "Z": 0}
self.previous_time = time.time()
self.lock = Lock()
self.collector_thread = None
self.fig = plt.figure()
self.ax = self.fig.add_subplot(111, projection="3d" if self.mode == "3d" else None)
def start(self):
self.running = True
self.collector_thread = Thread(target=self.collect_data_loop, daemon=True)
self.collector_thread.start()
self.ani = animation.FuncAnimation(self.fig, self.update_plot, interval=self.interval)
try:
plt.show()
except RuntimeError:
print("⚠️ Matplotlib GUI interrupted during shutdown.")
self.running = False
def stop(self, save_path: str = None):
self.running = False
if save_path:
try:
self.fig.savefig(save_path, bbox_inches="tight")
print(f"📸 Plot saved to '{save_path}'")
except Exception as e:
print(f"❌ Failed to save plot: {e}")
plt.close(self.fig)
def collect_data_loop(self):
while self.running:
with self.lock:
current_time = time.time()
dt = current_time - self.previous_time
self.previous_time = current_time
self.time_data.append(current_time)
position = self.client.receive_variables.get("RIst", {"X": 0, "Y": 0, "Z": 0})
joints = self.client.receive_variables.get("AIPos", {f"A{i}": 0 for i in range(1, 7)})
force = self.client.receive_variables.get("MaCur", {f"A{i}": 0 for i in range(1, 7)})
for axis in ["X", "Y", "Z"]:
vel = (position[axis] - self.previous_positions[axis]) / dt if dt > 0 else 0
acc = (vel - self.previous_velocities[axis]) / dt if dt > 0 else 0
self.previous_positions[axis] = position[axis]
self.previous_velocities[axis] = vel
self.position_data[axis].append(position[axis])
self.velocity_data[axis].append(vel)
self.acceleration_data[axis].append(acc)
for i in range(1, 7):
self.joint_data[f"A{i}"].append(joints.get(f"A{i}", 0))
self.force_data[f"A{i}"].append(force.get(f"A{i}", 0))
time.sleep(self.interval / 1000.0)
def update_plot(self, frame):
if not self.running:
return
with self.lock:
self.ax.clear()
self.render_plot()
def render_plot(self):
if self.mode == "3d":
self.ax.set_title("Live 3D TCP Trajectory")
self.ax.plot(self.position_data["X"], self.position_data["Y"], self.position_data["Z"], label="TCP Path")
self.ax.set_xlabel("X")
self.ax.set_ylabel("Y")
self.ax.set_zlabel("Z")
elif self.mode == "2d_xy":
self.ax.set_title("Live 2D Trajectory (X-Y)")
self.ax.plot(self.position_data["X"], self.position_data["Y"], label="XY Path")
self.ax.set_xlabel("X")
self.ax.set_ylabel("Y")
elif self.mode == "velocity":
self.ax.set_title("Live TCP Velocity")
self.ax.plot(self.time_data, self.velocity_data["X"], label="dX/dt")
self.ax.plot(self.time_data, self.velocity_data["Y"], label="dY/dt")
self.ax.plot(self.time_data, self.velocity_data["Z"], label="dZ/dt")
self.ax.set_ylabel("Velocity [mm/s]")
elif self.mode == "acceleration":
self.ax.set_title("Live TCP Acceleration")
self.ax.plot(self.time_data, self.acceleration_data["X"], label="d²X/dt²")
self.ax.plot(self.time_data, self.acceleration_data["Y"], label="d²Y/dt²")
self.ax.plot(self.time_data, self.acceleration_data["Z"], label="d²Z/dt²")
self.ax.set_ylabel("Acceleration [mm/s²]")
elif self.mode == "joints":
self.ax.set_title("Live Joint Angles")
for j, values in self.joint_data.items():
self.ax.plot(self.time_data, values, label=j)
self.ax.set_ylabel("Angle [deg]")
elif self.mode == "force":
self.ax.set_title("Live Motor Currents")
for j, values in self.force_data.items():
self.ax.plot(self.time_data, values, label=j)
self.ax.set_ylabel("Current [Nm]")
self.ax.set_xlabel("Time")
self.ax.legend()
self.ax.grid(True)
self.fig.tight_layout()
def change_mode(self, mode):
self.mode = mode
self.ax = self.fig.add_subplot(111, projection="3d" if mode == "3d" else None)

View File

@ -1,16 +1,19 @@
from src.RSIPI.rsi_api import RSIAPI
from src.RSIPI.live_plotter import LivePlotter
import math
import time
import threading
def main():
# Step 1: Create API instance
api = RSIAPI("RSI_EthernetConfig.xml")
time.sleep(10)
# Step 2: Start RSI connection
print("🔌 Starting RSI client...")
api.start_rsi()
time.sleep(10)
# Step 10: Stop RSI connection
print("🛑 Stopping RSI client...")
api.stop_rsi()
print("✅ All safety methods tested successfully.")
print(api.compare_test_runs("25-04-2025_16-33-47.csv", "25-04-2025_20-57-59.csv"))
if __name__ == "__main__":
main()

View File

@ -1,81 +1,73 @@
import multiprocessing
import socket
import time
import csv
import logging
import xml.etree.ElementTree as ET # ✅ FIX: Import ElementTree
import xml.etree.ElementTree as ET
from .xml_handler import XMLGenerator
from .safety_manager import SafetyManager
class NetworkProcess(multiprocessing.Process):
"""Handles UDP communication and optional CSV logging in a separate process."""
def __init__(self, ip, port, send_variables, receive_variables, stop_event, config_parser):
def __init__(self, ip, port, send_variables, receive_variables, stop_event, config_parser, start_event):
super().__init__()
self.send_variables = send_variables
self.receive_variables = receive_variables
self.stop_event = stop_event
self.start_event = start_event # ✅ NEW
self.config_parser = config_parser
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.safety_manager = SafetyManager(config_parser.safety_limits)
self.client_address = (ip, port)
if not self.is_valid_ip(ip):
logging.warning(f"Invalid IP address '{ip}' detected. Falling back to '0.0.0.0'.")
print(f"⚠️ Invalid IP '{ip}', falling back to '0.0.0.0'.")
self.client_address = ('0.0.0.0', port)
else:
self.client_address = (ip, port)
self.logging_active = multiprocessing.Value('b', False) # Shared flag for logging
self.log_filename = multiprocessing.Array('c', 256) # Shared memory for filename
self.logging_active = multiprocessing.Value('b', False)
self.log_filename = multiprocessing.Array('c', 256)
self.csv_process = None
self.controller_ip_and_port = None
def run(self):
"""Start the network loop."""
self.start_event.wait() # ✅ Wait until RSIClient sends start signal
try:
if not self.is_valid_ip(self.client_address[0]):
logging.warning(f"Invalid IP address '{self.client_address[0]}'. Falling back to '0.0.0.0'.")
self.client_address = ('0.0.0.0', self.client_address[1])
self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.udp_socket.bind(self.client_address)
logging.info(f"✅ Network process initialized on {self.client_address}")
logging.info(f"✅ Network process bound on {self.client_address}")
except OSError as e:
logging.error(f"❌ Failed to bind to {self.client_address}: {e}")
raise
self.controller_ip_and_port = None
@staticmethod
def is_valid_ip(ip):
"""Check if an IP address is valid and can be bound on this machine."""
try:
socket.inet_aton(ip) # Validate format
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.bind((ip, 0)) # Try binding
return True
except (socket.error, OSError):
return False
def run(self):
"""Start the network loop."""
print("[DEBUG] Network process started.")
while not self.stop_event.is_set():
try:
self.udp_socket.settimeout(5)
data_received, self.controller_ip_and_port = self.udp_socket.recvfrom(1024)
#print("Receive: ", data_received)
message = data_received.decode()
self.process_received_data(message)
#print("Network :", self.send_variables)
send_xml = XMLGenerator.generate_send_xml(self.send_variables, self.config_parser.network_settings)
#print("Send:", send_xml)
self.udp_socket.sendto(send_xml.encode(), self.controller_ip_and_port)
# ✅ If logging is active, write data to CSV
if self.logging_active.value:
self.log_to_csv()
except socket.timeout:
print("[WARNING] No message received within timeout period.")
logging.error("[WARNING] No message received within timeout period.")
except Exception as e:
print(f"[ERROR] Network process error: {e}")
logging.error(f"[ERROR] Network process error: {e}")
@staticmethod
def is_valid_ip(ip):
try:
socket.inet_aton(ip)
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.bind((ip, 0))
return True
except (socket.error, OSError):
return False
def process_received_data(self, xml_string):
try:
@ -86,87 +78,9 @@ class NetworkProcess(multiprocessing.Process):
self.receive_variables[element.tag] = {k: float(v) for k, v in element.attrib.items()}
else:
self.receive_variables[element.tag] = element.text
# specifically capture IPOC from received message
if element.tag == "IPOC":
received_ipoc = int(element.text)
self.receive_variables["IPOC"] = received_ipoc
self.send_variables["IPOC"] = received_ipoc + 4 # Increment by 4 ms
self.send_variables["IPOC"] = received_ipoc + 4
except Exception as e:
print(f"[ERROR] Error parsing received message: {e}")
def log_to_csv(self):
"""Write send/receive variables to the CSV log with safety flags."""
filename = self.log_filename.value.decode().strip()
if not filename:
return
try:
with open(filename, mode="a", newline="") as file:
writer = csv.writer(file)
# Write header if new
if file.tell() == 0:
headers = ["Timestamp", "IPOC"]
for k, v in self.send_variables.items():
if isinstance(v, dict):
headers += [f"Send.{k}.{subk}" for subk in v.keys()]
else:
headers.append(f"Send.{k}")
for k, v in self.receive_variables.items():
if isinstance(v, dict):
headers += [f"Receive.{k}.{subk}" for subk in v.keys()]
else:
headers.append(f"Receive.{k}")
headers.append("SafetyViolation")
writer.writerow(headers)
# Gather values safely
timestamp = time.strftime("%d-%m-%Y %H:%M:%S")
ipoc = self.receive_variables.get("IPOC", 0)
send_data = []
for k, v in self.send_variables.items():
if isinstance(v, dict):
send_data.extend([v.get(subk, "") for subk in v])
else:
send_data.append(v)
receive_data = []
for k, v in self.receive_variables.items():
if isinstance(v, dict):
receive_data.extend([v.get(subk, "") for subk in v])
else:
receive_data.append(v)
# Safety check
violation = "False"
try:
for var, val in self.send_variables.items():
if isinstance(val, dict):
for sub, subval in val.items():
path = f"{var}.{sub}"
self.safety_manager.validate(path, subval)
else:
self.safety_manager.validate(var, val)
except Exception as e:
violation = str(e)
writer.writerow([timestamp, ipoc] + send_data + receive_data + [violation])
except Exception as e:
print(f"[ERROR] Failed to log data to CSV: {e}")
def start_logging(self, filename):
"""Start logging RSI data to CSV."""
self.logging_active.value = True
self.log_filename.value = filename.encode()
print(f"✅ CSV Logging started: {filename}")
def stop_logging(self):
"""Stop logging RSI data."""
self.logging_active.value = False
print("🛑 CSV Logging stopped.")
def is_logging_active(self):
"""Return logging status."""
return self.logging_active.value
logging.error(f"[ERROR] Error parsing received message: {e}")

View File

@ -1,8 +1,9 @@
import logging
import pandas as pd
import numpy as np
import json
import matplotlib.pyplot as plt
from .rsi_client import RSIClient
from .kuka_visualiser import KukaRSIVisualiser
from .krl_to_csv_parser import KRLParser
from .inject_rsi_to_krl import inject_rsi_to_krl
@ -22,14 +23,16 @@ class RSIAPI:
"""Initialize RSIAPI with an RSI client instance."""
self.thread = None
self.config_file = config_file
self.client = None # Delay instantiation
self.graph_process = None # Store graphing process
self.client = None
self.graph_process = None
self.graphing_instance = None
self.graph_thread = None#
self.trajectory_queue = []
self.live_plotter = None
self.live_plot_thread = None
self._ensure_client()
def _ensure_client(self):
"""Ensure RSIClient is initialised before use."""
if self.client is None:
@ -37,15 +40,15 @@ class RSIAPI:
self.client = RSIClient(self.config_file)
def start_rsi(self):
self._ensure_client()
self.thread = threading.Thread(target=self.client.start, daemon=True)
self.thread.start()
return "RSI started in background."
return "RSI started in background."
def stop_rsi(self):
"""Stop the RSI client."""
self.client.stop()
return "RSI stopped."
return "RSI stopped."
def generate_report(filename, format_type):
"""
@ -89,9 +92,9 @@ class RSIAPI:
plt.tight_layout()
plt.savefig(output_path)
else:
raise ValueError(f"Unsupported format: {format_type}")
raise ValueError(f"Unsupported format: {format_type}")
return f"Report saved as {output_path}"
return f"Report saved as {output_path}"
def update_variable(self, name, value):
if "." in name:
@ -103,13 +106,13 @@ class RSIAPI:
safe_value = self.client.safety_manager.validate(full_path, float(value))
current[child] = safe_value
self.client.send_variables[parent] = current
return f"Updated {name} to {safe_value}"
return f"Updated {name} to {safe_value}"
else:
raise KeyError(f"Parent variable '{parent}' not found in send_variables")
raise KeyError(f"Parent variable '{parent}' not found in send_variables")
else:
safe_value = self.client.safety_manager.validate(name, float(value))
self.client.send_variables[name] = safe_value
return f"Updated {name} to {safe_value}"
return f"Updated {name} to {safe_value}"
def show_variables(self):
"""Print available variable names in send and receive variables."""
@ -169,7 +172,7 @@ class RSIAPI:
def reconnect(self):
"""Restart the network connection without stopping RSI."""
self.client.reconnect()
return "Network connection restarted."
return "Network connection restarted."
def toggle_digital_io(self, io_group, io_name, state):
"""
@ -246,7 +249,7 @@ class RSIAPI:
str: Status message indicating plot success or failure.
"""
if not os.path.exists(csv_path):
return f"CSV file not found: {csv_path}"
return f"CSV file not found: {csv_path}"
try:
plot_type = plot_type.lower()
@ -272,20 +275,20 @@ class RSIAPI:
StaticPlotter.plot_motor_currents(csv_path)
case "deviation":
if overlay_path is None or not os.path.exists(overlay_path):
return "Deviation plot requires a valid overlay CSV file."
return "Deviation plot requires a valid overlay CSV file."
StaticPlotter.plot_deviation(csv_path, overlay_path)
case _:
return f"Invalid plot type '{plot_type}'. Use one of: 3d, 2d_xy, 2d_xz, 2d_yz, position, velocity, acceleration, joints, force, deviation."
return f"Invalid plot type '{plot_type}'. Use one of: 3d, 2d_xy, 2d_xz, 2d_yz, position, velocity, acceleration, joints, force, deviation."
return f"✅ Plot '{plot_type}' generated successfully."
except Exception as e:
return f"Failed to generate plot '{plot_type}': {str(e)}"
return f"Failed to generate plot '{plot_type}': {str(e)}"
def start_live_plot(self, mode="3d", interval=100):
if self.live_plotter and self.live_plotter.running:
return "⚠️ Live plotting already active."
return "Live plotting already active."
def runner():
self.live_plotter = LivePlotter(self.client, mode=mode, interval=interval)
@ -293,19 +296,19 @@ class RSIAPI:
self.live_plot_thread = Thread(target=runner, daemon=True)
self.live_plot_thread.start()
return f"📈 Live plot started in '{mode}' mode at {interval}ms interval."
return f"Live plot started in '{mode}' mode at {interval}ms interval."
def stop_live_plot(self):
if self.live_plotter and self.live_plotter.running:
self.live_plotter.stop()
return "🛑 Live plotting stopped."
return "⚠️ No live plot is currently running."
return "Live plotting stopped."
return "No live plot is currently running."
def change_live_plot_mode(self, mode):
if self.live_plotter and self.live_plotter.running:
self.live_plotter.change_mode(mode)
return f"🔄 Live plot mode changed to '{mode}'."
return "⚠️ No live plot is active to change mode."
return f"Live plot mode changed to '{mode}'."
return "No live plot is active to change mode."
@ -313,7 +316,7 @@ class RSIAPI:
def enable_alerts(self, enable):
"""Enable or disable real-time alerts."""
self.client.enable_alerts(enable)
return f"Alerts {'enabled' if enable else 'disabled'}."
return f"Alerts {'enabled' if enable else 'disabled'}."
def override_safety(self, enabled: bool):
self.client.safety_manager.override_safety(enabled)
@ -325,8 +328,8 @@ class RSIAPI:
"""Set threshold for deviation or force alerts."""
if alert_type in ["deviation", "force"]:
self.client.set_alert_threshold(alert_type, value)
return f"{alert_type.capitalize()} alert threshold set to {value}"
return "Invalid alert type. Use 'deviation' or 'force'."
return f"{alert_type.capitalize()} alert threshold set to {value}"
return "Invalid alert type. Use 'deviation' or 'force'."
@staticmethod
def visualise_csv_log(csv_file, export=False):
@ -360,9 +363,9 @@ class RSIAPI:
parser.parse_src()
parser.parse_dat()
parser.export_csv(output_file)
return f"KRL data successfully exported to {output_file}"
return f"KRL data successfully exported to {output_file}"
except Exception as e:
return f"Error parsing KRL files: {e}"
return f"Error parsing KRL files: {e}"
@staticmethod
def inject_rsi(input_krl, output_krl=None, rsi_config="RSIGatewayv1.rsi"):
@ -377,9 +380,9 @@ class RSIAPI:
try:
inject_rsi_to_krl(input_krl, output_krl, rsi_config)
output_path = output_krl if output_krl else input_krl
return f"RSI successfully injected into {output_path}"
return f"RSI successfully injected into {output_path}"
except Exception as e:
return f"RSI injection failed: {e}"
return f"RSI injection failed: {e}"
@staticmethod
def generate_trajectory(start, end, steps=100, space="cartesian", mode="absolute", include_resets=False):
@ -403,7 +406,7 @@ class RSIAPI:
self.update_joints(**point)
else:
raise ValueError("space must be 'cartesian' or 'joint'")
print(f"🚀 Step {idx + 1}/{len(trajectory)} sent")
print(f"Step {idx + 1}/{len(trajectory)} sent")
await asyncio.sleep(rate)
try:
@ -453,7 +456,7 @@ class RSIAPI:
df = pd.DataFrame(data)
df.to_csv(filename, index=False)
return f"Movement data exported to {filename}"
return f"Movement data exported to {filename}"
@staticmethod
def compare_test_runs(file1, file2):
@ -484,7 +487,7 @@ class RSIAPI:
"""
self._ensure_client()
if "RKorr" not in self.client.send_variables:
print("⚠️ Warning: RKorr not configured in send_variables. Skipping Cartesian update.")
logging.warning("Warning: RKorr not configured in send_variables. Skipping Cartesian update.")
return
for axis, value in kwargs.items():
@ -496,7 +499,7 @@ class RSIAPI:
"""
self._ensure_client()
if "AKorr" not in self.client.send_variables:
print("⚠️ Warning: AKorr not configured in send_variables. Skipping Joint update.")
logging.warning("⚠️ Warning: AKorr not configured in send_variables. Skipping Joint update.")
return
for axis, value in kwargs.items():
@ -510,7 +513,7 @@ class RSIAPI:
import time
import datetime
print("📡 Watching network... Press Ctrl+C to stop.\n")
logging.info("Watching network... Press Ctrl+C to stop.\n")
start_time = time.time()
try:
@ -525,9 +528,7 @@ class RSIAPI:
break
except KeyboardInterrupt:
print("\n🛑 Stopped network watch.")
# --- 🤖 High-level Cartesian and Joint Trajectory Movement ---
logging.info("\nStopped network watch.")
def move_cartesian_trajectory(self, start_pose, end_pose, steps=50, rate=0.012):
"""
@ -558,11 +559,11 @@ class RSIAPI:
Generate and queue a Cartesian movement (no execution).
"""
if not isinstance(start_pose, dict) or not isinstance(end_pose, dict):
raise ValueError("start_pose and end_pose must be dictionaries (e.g., {'X': 0, 'Y': 0, 'Z': 500})")
raise ValueError("start_pose and end_pose must be dictionaries (e.g., {'X': 0, 'Y': 0, 'Z': 500})")
if steps <= 0:
raise ValueError("Steps must be greater than zero.")
raise ValueError("Steps must be greater than zero.")
if rate <= 0:
raise ValueError("Rate must be greater than zero.")
raise ValueError("Rate must be greater than zero.")
trajectory = self.generate_trajectory(start_pose, end_pose, steps=steps, space="cartesian")
self.queue_trajectory(trajectory, "cartesian", rate)
@ -572,11 +573,11 @@ class RSIAPI:
Generate and queue a Joint-space movement (no execution).
"""
if not isinstance(start_joints, dict) or not isinstance(end_joints, dict):
raise ValueError("start_joints and end_joints must be dictionaries (e.g., {'A1': 0, 'A2': 0})")
raise ValueError("start_joints and end_joints must be dictionaries (e.g., {'A1': 0, 'A2': 0})")
if steps <= 0:
raise ValueError("Steps must be greater than zero.")
raise ValueError("Steps must be greater than zero.")
if rate <= 0:
raise ValueError("Rate must be greater than zero.")
raise ValueError("Rate must be greater than zero.")
trajectory = self.generate_trajectory(start_joints, end_joints, steps=steps, space="joint")
self.queue_trajectory(trajectory, "joint", rate)

View File

@ -1,375 +1,196 @@
from .rsi_api import RSIAPI
from .kuka_visualiser import KukaRSIVisualiser
from .krl_to_csv_parser import KRLParser
from .inject_rsi_to_krl import inject_rsi_to_krl
from RSIPI.rsi_api import RSIAPI
class RSICommandLineInterface:
"""Command-Line Interface for controlling RSI Client."""
def __init__(self, input_config_file):
"""Initialize CLI with an RSI API instance."""
self.client = RSIAPI(input_config_file)
self.running = True
def run(self):
"""Starts the CLI interaction loop."""
print("✅ RSI Command-Line Interface Started. Type 'help' for commands.")
print("RSI Command-Line Interface Started. Type 'help' for commands.")
while self.running:
command = input("RSI> ").strip().lower()
self.process_command(command)
try:
command = input("RSI> ").strip()
self.process_command(command)
except KeyboardInterrupt:
self.exit()
def process_command(self, command):
"""Processes user input commands."""
parts = command.split()
if not parts:
return
cmd = parts[0]
cmd = parts[0].lower()
args = parts[1:]
if cmd == "start":
self.client.start_rsi()
elif cmd == "stop":
self.client.stop_rsi()
elif cmd == "set" and len(parts) >= 3:
variable, value = parts[1], " ".join(parts[2:])
self.client.update_variable(variable, value)
elif cmd == "alerts" and len(parts) == 2:
self.toggle_alerts(parts[1])
elif cmd == "set_alert_threshold" and len(parts) == 3:
self.set_alert_threshold(parts[1], parts[2])
elif cmd == "show":
if len(parts) == 0:
group = "all"
else:
group = parts[0]
self.client.show_variables(group)
elif cmd == "ipoc":
ipoc = self.client.get_ipoc()
print(f"🛰 Current IPOC: {ipoc}")
elif cmd == "watch":
duration = float(parts[0]) if parts else None
self.client.watch_network(duration=duration)
elif cmd == "reset":
self.client.reset_variables()
elif cmd == "status":
self.client.show_config_file()
elif cmd == "reconnect":
self.client.reconnect()
elif cmd == "toggle" and len(parts) == 3:
self.client.toggle_digital_io(parts[1], parts[2])
elif cmd == "move_external" and len(parts) == 3:
self.client.move_external_axis(parts[1], parts[2])
elif cmd == "correct" and len(parts) == 4:
self.client.correct_position(parts[1], parts[2], parts[3])
elif cmd == "speed" and len(parts) == 3:
self.client.adjust_speed(parts[1], parts[2])
elif cmd == "override" and len(parts) == 2:
state = parts[1]
if state in ["on", "true", "1"]:
self.client.override_safety(True)
print("🛡️ Safety override ENABLED.")
elif state in ["off", "false", "0"]:
self.client.override_safety(False)
print("🛡️ Safety override DISABLED.")
else:
print("❌ Usage: override on | off")
elif cmd == "log":
if len(parts) < 1:
print("⚠️ Usage: log start|stop|status")
return
subcmd = parts[0].lower()
if subcmd == "start":
filename = self.client.start_logging()
print(f"✅ Logging started → {filename}")
elif subcmd == "stop":
self.client.stop_logging()
print("🛑 Logging stopped.")
elif subcmd == "status":
status = self.client.is_logging_active()
print("📊 Logging is currently", "ACTIVE ✅" if status else "INACTIVE ❌")
else:
print("⚠️ Unknown log subcommand. Use: start, stop, status")
elif cmd == "graph":
if len(parts) == 0:
print("⚠️ Usage: graph show <file> | graph compare <file1> <file2>")
return
sub = parts[0].lower()
if sub == "show" and len(parts) == 2:
self.client.visualise_csv_log(parts[1])
elif sub == "compare" and len(parts) == 3:
self.client.compare_test_runs(parts[1], parts[2])
else:
print("⚠️ Usage:\n graph show <file>\n graph compare <file1> <file2>")
elif cmd == "export" and len(parts) == 2:
self.export_data(parts[1])
elif cmd == "compare" and len(parts) == 3:
self.compare_test_runs(parts[1], parts[2])
elif cmd == "report" and len(parts) >= 3:
self.generate_report(parts[1], parts[2])
elif cmd == "exit":
self.client.stop_rsi()
self.running = False
elif cmd == "help":
self.show_help()
elif cmd == "visualize" and len(parts) >= 2:
csv_file = parts[1]
export = ("export" in parts)
self.visualize(csv_file, export)
elif cmd == "krlparse" and len(parts) == 4:
src_file, dat_file, output_file = parts[1], parts[2], parts[3]
self.krl_parse(src_file, dat_file, output_file)
elif cmd == "inject_rsi" and len(parts) >= 2:
input_krl = parts[1]
output_krl = parts[2] if len(parts) >= 3 else None
rsi_config = parts[3] if len(parts) == 4 else "RSIGatewayv1.rsi"
self.inject_rsi(input_krl, output_krl, rsi_config)
elif cmd == "show" and len(parts) == 2 and parts[1] == "all":
variables = self.client.show_variables()
print("📤 Send Variables:")
for k, v in variables["send_variables"].items():
print(f" {k}: {v}")
print("📥 Receive Variables:")
for k, v in variables["receive_variables"].items():
print(f" {k}: {v}")
elif cmd == "show" and len(parts) == 2 and parts[1] == "live":
data = self.client.get_live_data()
print("📡 Live Data:")
for k, v in data.items():
print(f" {k}: {v}")
elif cmd == "log" and len(parts) == 2 and parts[1] == "status":
active = self.client.is_logging_active()
print(f"📋 Logging is {'ACTIVE' if active else 'INACTIVE'}")
elif cmd == "move_cartesian" and len(parts) >= 3:
start_dict = self.parse_pose_string(parts[1])
end_dict = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", default=50, cast_type=int)
rate = self.extract_optional_value(parts, "rate", default=0.04, cast_type=float)
trajectory = self.client.generate_trajectory(start_dict, end_dict, steps=steps, space="cartesian")
self.client.execute_trajectory(trajectory, space="cartesian", rate=rate)
elif cmd == "move_joint" and len(parts) >= 3:
start_dict = self.parse_pose_string(parts[1])
end_dict = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", default=50, cast_type=int)
rate = self.extract_optional_value(parts, "rate", default=0.4, cast_type=float)
trajectory = self.client.generate_trajectory(start_dict, end_dict, steps=steps, space="joint")
self.client.execute_trajectory(trajectory, space="joint", rate=rate)
elif cmd == "queue_cartesian" and len(parts) >= 3:
start = self.parse_pose_string(parts[1])
end = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", 50, int)
rate = self.extract_optional_value(parts, "rate", 0.04, float)
traj = self.client.generate_trajectory(start, end, steps, "cartesian")
self.client.queue_trajectory(traj, "cartesian", rate)
elif cmd == "queue_joint" and len(parts) >= 3:
start = self.parse_pose_string(parts[1])
end = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", 50, int)
rate = self.extract_optional_value(parts, "rate", 0.04, float)
traj = self.client.generate_trajectory(start, end, steps, "joint")
self.client.queue_trajectory(traj, "joint", rate)
elif cmd == "execute_queue":
self.client.execute_queued_trajectories()
elif cmd == "clear_queue":
self.client.clear_trajectory_queue()
elif cmd == "show_queue":
queue = self.client.get_trajectory_queue()
print(f"🧾 Trajectory Queue: {len(queue)} items")
for i, q in enumerate(queue):
print(f" {i + 1}. {q['space']} | {q['steps']} steps | {q['rate']}s")
elif cmd == "export_movement_data" and len(parts) == 2:
result = self.client.export_movement_data(parts[1])
print(result)
elif cmd == "compare_test_runs" and len(parts) == 3:
result = self.client.compare_test_runs(parts[1], parts[2])
print("📊 Comparison Results:")
for key, stats in result.items():
print(f"{key}: mean_diff={stats['mean_diff']:.3f}, max_diff={stats['max_diff']:.3f}")
elif cmd == "generate_report" and len(parts) in [2, 3]:
output = parts[2] if len(parts) == 3 else "report.txt"
result = generate_report(parts[1], output)
print(result)
elif cmd == "safety-stop":
self.client.safety_manager.emergency_stop()
print("🛑 Safety: Emergency Stop activated.")
elif cmd == "safety-reset":
self.client.safety_manager.reset_stop()
print("✅ Safety: Emergency Stop reset. Motion allowed.")
elif cmd == "safety-status":
sm = self.client.safety_manager
override_active = self.client.is_safety_overridden()
print("🛡️ Safety System Status:")
print(" 🧱 Emergency Stop:", "ACTIVE 🛑" if sm.is_stopped() else "CLEARED ✅")
print(" 🛡️ Safety Override:", "ENABLED" if override_active else "DISABLED")
print(" 📏 Enforced Limits:")
for var, (lo, hi) in sm.get_limits().items():
print(f" - {var}: {lo}{hi}")
elif cmd == "plot" and len(parts) >= 2:
plot_type = parts[1]
if len(parts) < 3:
print("⚠️ Usage: plot <type> <csv_path> [overlay_path]")
return
csv_path = parts[2]
overlay_path = parts[3] if len(parts) >= 4 else None
result = self.client.generate_plot(csv_path, plot_type=plot_type, overlay_path=overlay_path)
print(result)
elif cmd == "safety-set-limit" and len(parts) == 4:
var, lo, hi = parts[1], parts[2], parts[3]
try:
lo = float(lo)
hi = float(hi)
self.client.safety_manager.set_limit(var, lo, hi)
print(f"✅ Set limit for {var}: {lo} to {hi}")
except ValueError:
print("❌ Invalid numbers for limit. Usage: safety-set-limit RKorr.X -5 5")
else:
print("❌ Unknown command. Type 'help' for a list of commands.")
def toggle_alerts(self, state):
"""Enable or disable real-time alerts."""
if state == "on":
self.client.enable_alerts(True)
print("✅ Real-time alerts enabled.")
elif state == "off":
self.client.enable_alerts(False)
print("✅ Real-time alerts disabled.")
else:
print("❌ Invalid option. Use 'alerts on' or 'alerts off'.")
def set_alert_threshold(self, alert_type, value):
"""Set thresholds for deviation or force alerts."""
try:
value = float(value)
if alert_type in ["deviation", "force"]:
self.client.set_alert_threshold(alert_type, value)
print(f"{alert_type.capitalize()} alert threshold set to {value}")
else:
print("❌ Invalid alert type. Use 'deviation' or 'force'.")
except ValueError:
print("❌ Invalid threshold value. Enter a numeric value.")
def export_data(self, filename):
"""Export movement data to a CSV file."""
self.client.export_movement_data(filename)
print(f"✅ Data exported to {filename}")
def compare_test_runs(self, file1, file2):
"""Compare two test runs from CSV files."""
result = self.client.compare_test_runs(file1, file2)
print(result)
def generate_report(self, filename, format_type):
"""Generate a statistical report from movement data."""
if format_type not in ["csv", "json", "pdf"]:
print("❌ Invalid format. Use 'csv', 'json', or 'pdf'.")
return
self.client.generate_report(filename, format_type)
print(f"✅ Report generated: {filename}.{format_type}")
def show_help(self):
"""Displays the list of available commands."""
print("""
Available Commands:
start, stop, exit
set <var> <value>, show, ipoc, watch, reset, status, reconnect
toggle <DiO/DiL> <0/1>, move_external <axis> <value>
correct <RKorr/AKorr> <X/Y/Z/A/B/C> <value>, speed <Tech.TX> <value>
override <limit>
log start <file>.csv, log stop, log status
graph start <mode>, graph stop, graph mode <position|velocity|acceleration|force>
graph overlay on/off, graph load_plan <file>
export <filename.csv>
compare <file1.csv> <file2.csv>
report <filename> <csv|json|pdf>
alerts on/off
set_alert_threshold <deviation|force> <value>
show all - Show all current send and receive variables
show live - Show real-time TCP, force, and IPOC values
log status - Display whether logging is currently active
move_cartesian <start> <end> [steps=50] [rate=0.012]
e.g., X=0,Y=0,Z=500 A=100,Y=0,Z=500 steps=100 rate=0.012
move_joint <start> <end> [steps=50] [rate=0.012]
e.g., A1=0,... A1=90,... steps=60
queue_cartesian <start> <end> [steps=50] [rate=0.012] - Queue linear Cartesian trajectory
queue_joint <start> <end> [steps=50] [rate=0.012] - Queue linear Joint trajectory
show_queue - Show queued trajectory segments
clear_queue - Clear all queued trajectories
execute_queue - Execute all queued motions
export_movement_data <csvfile> - Export logged motion data to CSV
compare_test_runs <file1> <file2> - Compare 2 test logs (e.g. deviation)
generate_report <input.csv> [out.txt] - Create a movement analysis report
safety-stop - Emergency stop: block motion
safety-reset - Reset emergency stop
safety-status - Show safety and override status
override on/off - Enable or disable safety override
alerts on/off
set_alert_threshold <deviation|force> <value>
""")
def visualise(self, csv_file, export=False):
try:
visualiser = KukaRSIVisualiser(csv_file)
visualiser.plot_trajectory()
visualiser.plot_joint_positions()
visualiser.plot_force_trends()
if export:
visualiser.export_graphs()
print(f"✅ Visualisations exported for '{csv_file}'")
match cmd:
case "start":
print(self.client.start_rsi())
case "stop":
print(self.client.stop_rsi())
case "exit":
self.exit()
case "set":
var, val = args[0], args[1]
print(self.client.update_variable(var, val))
case "show":
print("📤 Send Variables:")
self.client.show_variables()
case "reset":
print(self.client.reset_variables())
case "status":
print(self.client.show_config_file())
case "ipoc":
print(f"🛰 IPOC: {self.client.get_ipoc()}")
case "watch":
duration = float(args[0]) if args else None
self.client.watch_network(duration)
case "reconnect":
print(self.client.reconnect())
case "alerts":
state = args[0].lower()
self.client.enable_alerts(state == "on")
case "set_alert_threshold":
alert_type, value = args[0], float(args[1])
self.client.set_alert_threshold(alert_type, value)
case "toggle":
group, name, value = args
print(self.client.toggle_digital_io(group, name, value))
case "move_external":
axis, value = args
print(self.client.move_external_axis(axis, value))
case "correct":
corr_type, axis, value = args
print(self.client.correct_position(corr_type, axis, value))
case "speed":
tech_param, value = args
print(self.client.adjust_speed(tech_param, value))
case "override":
state = args[0]
self.client.override_safety(state in ["on", "true", "1"])
case "log":
subcmd = args[0]
if subcmd == "start":
print(f"✅ Logging to {self.client.start_logging()}")
elif subcmd == "stop":
print(self.client.stop_logging())
elif subcmd == "status":
print("📋", "ACTIVE" if self.client.is_logging_active() else "INACTIVE")
case "graph":
sub = args[0]
if sub == "show":
self.client.visualise_csv_log(args[1])
elif sub == "compare":
print(self.client.compare_test_runs(args[1], args[2]))
case "plot":
plot_type, csv_path = args[0], args[1]
overlay = args[2] if len(args) > 2 else None
print(self.client.generate_plot(csv_path, plot_type, overlay))
case "move_cartesian":
start = self.parse_pose(args[0])
end = self.parse_pose(args[1])
steps = self.extract_value(args, "steps", 50, int)
rate = self.extract_value(args, "rate", 0.04, float)
self.client.move_cartesian_trajectory(start, end, steps, rate)
case "move_joint":
start = self.parse_pose(args[0])
end = self.parse_pose(args[1])
steps = self.extract_value(args, "steps", 50, int)
rate = self.extract_value(args, "rate", 0.04, float)
self.client.move_joint_trajectory(start, end, steps, rate)
case "queue_cartesian":
start = self.parse_pose(args[0])
end = self.parse_pose(args[1])
steps = self.extract_value(args, "steps", 50, int)
rate = self.extract_value(args, "rate", 0.04, float)
self.client.queue_cartesian_trajectory(start, end, steps, rate)
case "queue_joint":
start = self.parse_pose(args[0])
end = self.parse_pose(args[1])
steps = self.extract_value(args, "steps", 50, int)
rate = self.extract_value(args, "rate", 0.04, float)
self.client.queue_joint_trajectory(start, end, steps, rate)
case "execute_queue":
self.client.execute_queued_trajectories()
case "clear_queue":
self.client.clear_trajectory_queue()
case "show_queue":
print(self.client.get_trajectory_queue())
case "export_movement_data":
print(self.client.export_movement_data(args[0]))
case "compare_test_runs":
print(self.client.compare_test_runs(args[0], args[1]))
case "generate_report":
print(self.client.generate_report(args[0], args[1]))
case "safety-stop":
self.client.safety_stop()
case "safety-reset":
self.client.safety_reset()
case "safety-status":
print(self.client.safety_status())
case "safety-set-limit":
var, lo, hi = args
self.client.safety_set_limit(var, lo, hi)
case "krlparse":
self.client.parse_krl_to_csv(args[0], args[1], args[2])
case "inject_rsi":
input_krl = args[0]
output_krl = args[1] if len(args) > 1 else None
rsi_cfg = args[2] if len(args) > 2 else "RSIGatewayv1.rsi"
self.client.inject_rsi(input_krl, output_krl, rsi_cfg)
case "visualize":
self.client.visualise_csv_log(args[0], export="export" in args)
case "help":
self.show_help()
case _:
print("❌ Unknown command. Type 'help'.")
except Exception as e:
print(f"❌ Failed to visualize '{csv_file}': {e}")
print(f"❌ Error: {e}")
def krl_parse(self, src_file, dat_file, output_file):
"""CLI method to parse KRL files and output CSV."""
try:
parser = KRLParser(src_file, dat_file)
parser.parse_src()
parser.parse_dat()
parser.export_csv(output_file)
print(f"✅ KRL files parsed successfully. Output CSV: {output_file}")
except Exception as e:
print(f"❌ Failed to parse KRL files: {e}")
def parse_pose(self, pose_string):
return dict(item.split("=") for item in pose_string.split(","))
def inject_rsi(self, input_krl, output_krl=None, rsi_config="RSIGatewayv1.rsi"):
"""Inject RSI commands into a KRL file via CLI."""
try:
inject_rsi_to_krl(input_krl, output_krl, rsi_config)
output_path = output_krl if output_krl else input_krl
print(f"✅ RSI commands successfully injected into '{output_path}'")
except Exception as e:
print(f"❌ Failed to inject RSI commands: {e}")
def extract_optional_value(self, parts, key, default=0, cast_type=float):
"""
Extracts optional arguments like 'steps=100' or 'rate=0.01'
"""
for part in parts[3:]: # skip cmd, start, end
if part.startswith(f"{key}="):
def extract_value(self, args, key, default, cast_type):
for arg in args[2:]:
if arg.startswith(f"{key}="):
try:
return cast_type(part.split("=")[1])
return cast_type(arg.split("=")[1])
except ValueError:
return default
return default
def exit(self):
print("🛑 Exiting RSI CLI...")
self.client.stop_rsi()
self.running = False
def show_help(self):
print("""
Available Commands:
start, stop, exit
set <var> <value>
show, status, ipoc, watch, reset, reconnect
alerts on/off, set_alert_threshold <type> <value>
toggle <group> <name> <state>
move_external <axis> <value>, correct <RKorr/AKorr> <axis> <value>
speed <TechParam> <value>
log start|stop|status
graph show <csv> | graph compare <csv1> <csv2>
plot <type> <csv> [overlay]
move_cartesian, move_joint, queue_cartesian, queue_joint
execute_queue, clear_queue, show_queue
export_movement_data <file>
compare_test_runs <file1> <file2>
generate_report <file> <format>
safety-stop, safety-reset, safety-status, safety-set-limit
krlparse <src> <dat> <output>
inject_rsi <input> [output] [rsi_config]
visualize <csv> [export]
help
""")
if __name__ == "__main__":
config_file = "RSI_EthernetConfig.xml"
cli = RSICommandLineInterface(config_file)
cli = RSICommandLineInterface("RSI_EthernetConfig.xml")
cli.run()

View File

@ -4,15 +4,14 @@ import time
from .config_parser import ConfigParser
from .network_handler import NetworkProcess
from .safety_manager import SafetyManager
import threading
class RSIClient:
"""Main RSI API class that integrates network, config handling, and message processing."""
def __init__(self, config_file, rsi_limits_file=None):
"""Initialize the RSI client and set up logging and networking."""
logging.info(f"📂 Loading RSI configuration from {config_file}...")
def __init__(self, config_file, rsi_limits_file=None):
logging.info(f"Loading RSI configuration from {config_file}...")
# Load configuration
self.config_parser = ConfigParser(config_file, rsi_limits_file)
network_settings = self.config_parser.get_network_settings()
@ -20,95 +19,73 @@ class RSIClient:
self.send_variables = self.manager.dict(self.config_parser.send_variables)
self.receive_variables = self.manager.dict(self.config_parser.receive_variables)
self.stop_event = multiprocessing.Event()
self.start_event = multiprocessing.Event() # ✅ NEW
# ✅ Initialise safety manager from limits
self.safety_manager = SafetyManager(self.config_parser.safety_limits)
logging.info(f"🚀 Starting network process on {network_settings['ip']}:{network_settings['port']}...")
# ✅ Corrected constructor call with all necessary parameters
# ✅ Create NetworkProcess but don't start communication yet
self.network_process = NetworkProcess(
network_settings["ip"],
network_settings["port"],
self.send_variables,
self.receive_variables,
self.stop_event,
self.config_parser
self.config_parser,
self.start_event
)
self.network_process.start()
self.logger = None # Placeholder for logging module
self.logger = None
def start(self):
"""Keep the client running and allow periodic debugging."""
logging.info("✅ RSI Client Started")
print("✅ RSI Client Started. Press CTRL+C to stop.")
"""Send start signal to NetworkProcess and run control loop."""
logging.info("RSIClient sending start signal to NetworkProcess...")
self.start_event.set()
self.running = True
logging.info("RSI Client Started")
try:
while not self.stop_event.is_set():
while self.running and not self.stop_event.is_set():
time.sleep(2)
except KeyboardInterrupt:
self.stop()
except Exception as e:
logging.error(f"❌ RSI Client encountered an error: {e}")
print(f"❌ RSI Client encountered an error: {e}")
logging.error(f"RSI Client encountered an error: {e}")
def stop(self):
"""Stop the network process safely and close resources."""
"""Stop the network process and the client thread safely."""
logging.info("🛑 Stopping RSI Client...")
print("🛑 Stopping RSI Client...")
self.stop_event.set() # ✅ Signal all processes to stop
self.running = False
self.stop_event.set() # ✅ Tell network process to exit nicely
if self.network_process.is_alive():
self.network_process.terminate()
self.network_process.join()
if self.network_process and self.network_process.is_alive():
self.network_process.join(timeout=3) # ✅ Give it time to shutdown
if self.network_process.is_alive():
logging.warning("⚠️ Forcing network process termination...")
self.network_process.terminate()
self.network_process.join()
if hasattr(self, "thread") and self.thread and self.thread.is_alive():
self.thread.join()
self.thread = None
logging.info("✅ RSI Client Stopped")
print("✅ RSI Client Stopped")
def update_send_variable(self, name, value):
print(f"[DEBUG] update_send_variable called with: {name} = {value}")
if "." in name:
parent, child = name.split(".", 1)
if parent in self.send_variables:
current = dict(self.send_variables[parent]) # copy inner dict
current[child] = float(value)
self.send_variables[parent] = current # reassign to trigger proxy update
return f"✅ Updated {name} to {value}"
else:
return f"❌ Parent variable '{parent}' not found in send_variables"
else:
self.send_variables[name] = float(value)
return f"✅ Updated {name} to {value}"
def start_logging(self, filename):
if hasattr(self.network_process, "start_logging"):
self.network_process.start_logging(filename)
def stop_logging(self):
if hasattr(self.network_process, "stop_logging"):
self.network_process.stop_logging()
def is_logging_active(self):
return self.network_process.is_logging_active()
def enable_alerts(self, enable):
if hasattr(self.network_process, "enable_alerts"):
self.network_process.enable_alerts(enable)
def set_alert_threshold(self, alert_type, threshold):
if hasattr(self.network_process, "set_alert_threshold"):
self.network_process.set_alert_threshold(alert_type, threshold)
def reset_send_variables(self):
self.send_variables.update(self.config_parser.send_variables.copy())
def reconnect(self):
if self.network_process.is_alive():
"""Reconnects the network process safely."""
logging.info("Reconnecting RSI Client network...")
if self.network_process and self.network_process.is_alive():
self.stop_event.set()
self.network_process.terminate()
self.network_process.join()
# Fresh new events
self.stop_event = multiprocessing.Event()
self.start_event = multiprocessing.Event()
# Create new network process
network_settings = self.config_parser.get_network_settings()
self.network_process = NetworkProcess(
network_settings["ip"],
@ -116,24 +93,11 @@ class RSIClient:
self.send_variables,
self.receive_variables,
self.stop_event,
self.config_parser
self.config_parser,
self.start_event
)
self.network_process.start()
def get_movement_data(self):
"""Returns a list of all logged data entries."""
if hasattr(self, "logger") and self.logger:
return self.logger.get_all_records()
return []
if __name__ == "__main__":
config_file = "RSI_EthernetConfig.xml"
client = RSIClient(config_file)
try:
client.start()
except KeyboardInterrupt:
client.stop()
# Fresh control thread
self.thread = threading.Thread(target=self.start, daemon=True)
self.thread.start()

View File

@ -3,7 +3,7 @@ import logging
from src.RSIPI.rsi_limit_parser import parse_rsi_limits
# ✅ Configure Logging (toggleable)
LOGGING_ENABLED = True # Change to False to silence logging output
LOGGING_ENABLED = True # Change too False to silence logging output
if LOGGING_ENABLED:
logging.basicConfig(
@ -67,9 +67,9 @@ class RSIConfig:
if self.rsi_limits_file:
try:
self.safety_limits = parse_rsi_limits(self.rsi_limits_file)
logging.info(f"Loaded safety limits from {self.rsi_limits_file}")
logging.info(f"Loaded safety limits from {self.rsi_limits_file}")
except Exception as e:
logging.warning(f"⚠️ Failed to load RSI safety limits: {e}")
logging.warning(f"Failed to load RSI safety limits: {e}")
self.safety_limits = {}
@staticmethod

View File

@ -60,8 +60,8 @@ class EchoServer:
self.running = True
self.thread = threading.Thread(target=self.send_message, daemon=True)
logging.info(f"Echo Server started on {self.server_address}")
print(f"Echo Server started in {self.mode.upper()} mode.")
logging.info(f"Echo Server started on {self.server_address}")
print(f"Echo Server started in {self.mode.upper()} mode.")
def receive_and_process(self):
"""
@ -98,7 +98,7 @@ class EchoServer:
elif tag == "IPOC":
self.ipoc_value = int(elem.text.strip())
logging.debug(f"Processed input: {ET.tostring(root).decode()}")
logging.debug(f"Processed input: {ET.tostring(root).decode()}")
except socket.timeout:
pass # No data within delay window
except ConnectionResetError:
@ -148,7 +148,7 @@ class EchoServer:
def stop(self):
"""Stops the echo server and cleans up the socket."""
print("🛑 Stopping Echo Server...")
print("Stopping Echo Server...")
self.running = False
self.thread.join()
self.udp_socket.close()

View File

@ -118,9 +118,9 @@ class RSIGraphing:
"""Switch graphing mode at runtime (position, velocity, acceleration, force)."""
if mode in ["position", "velocity", "acceleration", "force"]:
self.mode = mode
print(f"Graphing mode changed to: {mode}")
print(f"Graphing mode changed to: {mode}")
else:
print("Invalid mode. Available: position, velocity, acceleration, force")
print("Invalid mode. Available: position, velocity, acceleration, force")
def set_alert_threshold(self, alert_type, threshold):
"""Update threshold values for alerts."""
@ -128,12 +128,12 @@ class RSIGraphing:
self.deviation_threshold = threshold
elif alert_type == "force":
self.force_threshold = threshold
print(f"{alert_type.capitalize()} alert threshold set to {threshold}")
print(f"{alert_type.capitalize()} alert threshold set to {threshold}")
def enable_alerts(self, enable):
"""Enable or disable real-time alerts."""
self.alerts_enabled = enable
print(f"Alerts {'enabled' if enable else 'disabled'}.")
print(f"Alerts {'enabled' if enable else 'disabled'}.")
def stop(self):
"""Gracefully stop plotting by closing the figure."""