Compare commits

..

2 Commits

12 changed files with 448 additions and 626 deletions

View File

@ -1,3 +1,4 @@
import logging
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
class ConfigParser: class ConfigParser:
@ -70,9 +71,6 @@ class ConfigParser:
print(f"[WARNING] Failed to load .rsi.xml safety limits: {e}") print(f"[WARNING] Failed to load .rsi.xml safety limits: {e}")
self.safety_limits = {} self.safety_limits = {}
print(f"✅ Final Send Variables: {self.send_variables}")
print(f"✅ Final Receive Variables: {self.receive_variables}")
def process_config(self): def process_config(self):
""" """
Parses the RSI config file and builds the send/receive variable dictionaries. Parses the RSI config file and builds the send/receive variable dictionaries.
@ -90,7 +88,7 @@ class ConfigParser:
# Extract <CONFIG> section for IP/port/etc. # Extract <CONFIG> section for IP/port/etc.
config = root.find("CONFIG") config = root.find("CONFIG")
if config is None: if config is None:
raise ValueError("Missing <CONFIG> section in RSI_EthernetConfig.xml") raise ValueError("Missing <CONFIG> section in RSI_EthernetConfig.xml")
self.network_settings = { self.network_settings = {
"ip": config.find("IP_NUMBER").text.strip() if config.find("IP_NUMBER") is not None else None, "ip": config.find("IP_NUMBER").text.strip() if config.find("IP_NUMBER") is not None else None,
@ -102,7 +100,7 @@ class ConfigParser:
print(f"✅ Loaded network settings: {self.network_settings}") print(f"✅ Loaded network settings: {self.network_settings}")
if None in self.network_settings.values(): if None in self.network_settings.values():
raise ValueError("Missing one or more required network settings (ip, port, sentype, onlysend)") raise ValueError("Missing one or more required network settings (ip, port, sentype, onlysend)")
# Parse SEND section # Parse SEND section
send_section = root.find("SEND/ELEMENTS") send_section = root.find("SEND/ELEMENTS")
@ -123,7 +121,7 @@ class ConfigParser:
return send_vars, receive_vars return send_vars, receive_vars
except Exception as e: except Exception as e:
print(f"Error processing config file: {e}") logging.error(f"Error processing config file: {e}")
return {}, {} return {}, {}
def process_variable_structure(self, var_dict, tag, var_type, indx=""): def process_variable_structure(self, var_dict, tag, var_type, indx=""):
@ -138,24 +136,19 @@ class ConfigParser:
""" """
tag = tag.replace("DEF_", "") # Remove DEF_ prefix if present tag = tag.replace("DEF_", "") # Remove DEF_ prefix if present
print(f"🔍 Assigning {tag}: INDX={indx}, TYPE={var_type}")
if tag in self.internal_structure: if tag in self.internal_structure:
# If pre-defined internally, copy structure # If pre-defined internally, copy structure
internal_value = self.internal_structure[tag] internal_value = self.internal_structure[tag]
var_dict[tag] = internal_value.copy() if isinstance(internal_value, dict) else internal_value var_dict[tag] = internal_value.copy() if isinstance(internal_value, dict) else internal_value
print(f"✅ INTERNAL Match: {tag} -> {var_dict[tag]}")
elif "." in tag: elif "." in tag:
# Handle nested dictionary e.g. Tech.T21 -> { 'Tech': { 'T21': 0.0 } } # Handle nested dictionary e.g. Tech.T21 -> { 'Tech': { 'T21': 0.0 } }
parent, subkey = tag.split(".", 1) parent, subkey = tag.split(".", 1)
if parent not in var_dict: if parent not in var_dict:
var_dict[parent] = {} var_dict[parent] = {}
var_dict[parent][subkey] = self.get_default_value(var_type) var_dict[parent][subkey] = self.get_default_value(var_type)
print(f"📂 Assigned '{tag}' as nested dictionary under '{parent}': {var_dict[parent]}")
else: else:
# Standard single-value variable # Standard single-value variable
var_dict[tag] = self.get_default_value(var_type) var_dict[tag] = self.get_default_value(var_type)
print(f"📄 Assigned Standard Value: '{tag}' -> {var_dict[tag]}")
@staticmethod @staticmethod
def rename_tech_keys(var_dict): def rename_tech_keys(var_dict):
@ -171,7 +164,6 @@ class ConfigParser:
tech_data.update(var_dict.pop(key)) tech_data.update(var_dict.pop(key))
if tech_data: if tech_data:
var_dict["Tech"] = tech_data var_dict["Tech"] = tech_data
print(f"✅ Renamed Tech.XX keys to 'Tech': {var_dict['Tech']}")
@staticmethod @staticmethod
def get_default_value(var_type): def get_default_value(var_type):

View File

@ -1,4 +1,5 @@
import csv import csv
import logging
import re import re
from collections import OrderedDict from collections import OrderedDict
@ -28,8 +29,6 @@ class KRLParser:
if label not in self.labels_to_extract: if label not in self.labels_to_extract:
self.labels_to_extract.append(label) self.labels_to_extract.append(label)
print(f"📌 Found labels in .src: {self.labels_to_extract}")
def parse_dat(self): def parse_dat(self):
""" """
Parses the .dat file and retrieves Cartesian coordinates for each label. Parses the .dat file and retrieves Cartesian coordinates for each label.
@ -58,8 +57,6 @@ class KRLParser:
self.positions[label] = coords self.positions[label] = coords
print(f"📥 Parsed {len(self.positions)} positions from .dat")
def export_csv(self, output_file): def export_csv(self, output_file):
""" """
Writes the extracted Cartesian positions into a structured CSV file, Writes the extracted Cartesian positions into a structured CSV file,
@ -90,9 +87,9 @@ class KRLParser:
}) })
sequence_number += 1 sequence_number += 1
else: else:
print(f"⚠️ Skipped missing/deleted point: {label}") logging.warning(f"Skipped missing/deleted point: {label}")
print(f"CSV exported successfully to {output_file} with {sequence_number} points.") logging.info(f"CSV exported successfully to {output_file} with {sequence_number} points.")
# Optional CLI usage # Optional CLI usage

View File

@ -152,9 +152,9 @@ if __name__ == "__main__":
if args.limits: if args.limits:
from src.RSIPI.rsi_limit_parser import parse_rsi_limits from src.RSIPI.rsi_limit_parser import parse_rsi_limits
limits = parse_rsi_limits(args.limits) limits = parse_rsi_limits(args.limits)
visualiser = Kukarsivisualiser(args.csv_file, safety_limits=limits) visualiser = KukaRSIVisualiser(args.csv_file, safety_limits=limits)
else: else:
visualiser = Kukarsivisualiser(args.csv_file) visualiser = KukaRSIVisualiser(args.csv_file)
visualiser.plot_trajectory() visualiser.plot_trajectory()
visualiser.plot_joint_positions() visualiser.plot_joint_positions()

130
src/RSIPI/live_plotter.py Normal file
View File

@ -0,0 +1,130 @@
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from collections import deque
from threading import Thread, Lock
import time
class LivePlotter:
def __init__(self, client, mode="3d", interval=100):
self.client = client
self.mode = mode
self.interval = interval
self.running = False
# Plot data buffers
self.time_data = deque(maxlen=500)
self.position_data = {k: deque(maxlen=500) for k in ["X", "Y", "Z"]}
self.velocity_data = {k: deque(maxlen=500) for k in ["X", "Y", "Z"]}
self.acceleration_data = {k: deque(maxlen=500) for k in ["X", "Y", "Z"]}
self.joint_data = {f"A{i}": deque(maxlen=500) for i in range(1, 7)}
self.force_data = {f"A{i}": deque(maxlen=500) for i in range(1, 7)}
self.previous_positions = {"X": 0, "Y": 0, "Z": 0}
self.previous_velocities = {"X": 0, "Y": 0, "Z": 0}
self.previous_time = time.time()
self.lock = Lock()
self.collector_thread = None
self.fig = plt.figure()
self.ax = self.fig.add_subplot(111, projection="3d" if self.mode == "3d" else None)
def start(self):
self.running = True
self.collector_thread = Thread(target=self.collect_data_loop, daemon=True)
self.collector_thread.start()
self.ani = animation.FuncAnimation(self.fig, self.update_plot, interval=self.interval)
try:
plt.show()
except RuntimeError:
print("⚠️ Matplotlib GUI interrupted during shutdown.")
self.running = False
def stop(self, save_path: str = None):
self.running = False
if save_path:
try:
self.fig.savefig(save_path, bbox_inches="tight")
print(f"📸 Plot saved to '{save_path}'")
except Exception as e:
print(f"❌ Failed to save plot: {e}")
plt.close(self.fig)
def collect_data_loop(self):
while self.running:
with self.lock:
current_time = time.time()
dt = current_time - self.previous_time
self.previous_time = current_time
self.time_data.append(current_time)
position = self.client.receive_variables.get("RIst", {"X": 0, "Y": 0, "Z": 0})
joints = self.client.receive_variables.get("AIPos", {f"A{i}": 0 for i in range(1, 7)})
force = self.client.receive_variables.get("MaCur", {f"A{i}": 0 for i in range(1, 7)})
for axis in ["X", "Y", "Z"]:
vel = (position[axis] - self.previous_positions[axis]) / dt if dt > 0 else 0
acc = (vel - self.previous_velocities[axis]) / dt if dt > 0 else 0
self.previous_positions[axis] = position[axis]
self.previous_velocities[axis] = vel
self.position_data[axis].append(position[axis])
self.velocity_data[axis].append(vel)
self.acceleration_data[axis].append(acc)
for i in range(1, 7):
self.joint_data[f"A{i}"].append(joints.get(f"A{i}", 0))
self.force_data[f"A{i}"].append(force.get(f"A{i}", 0))
time.sleep(self.interval / 1000.0)
def update_plot(self, frame):
if not self.running:
return
with self.lock:
self.ax.clear()
self.render_plot()
def render_plot(self):
if self.mode == "3d":
self.ax.set_title("Live 3D TCP Trajectory")
self.ax.plot(self.position_data["X"], self.position_data["Y"], self.position_data["Z"], label="TCP Path")
self.ax.set_xlabel("X")
self.ax.set_ylabel("Y")
self.ax.set_zlabel("Z")
elif self.mode == "2d_xy":
self.ax.set_title("Live 2D Trajectory (X-Y)")
self.ax.plot(self.position_data["X"], self.position_data["Y"], label="XY Path")
self.ax.set_xlabel("X")
self.ax.set_ylabel("Y")
elif self.mode == "velocity":
self.ax.set_title("Live TCP Velocity")
self.ax.plot(self.time_data, self.velocity_data["X"], label="dX/dt")
self.ax.plot(self.time_data, self.velocity_data["Y"], label="dY/dt")
self.ax.plot(self.time_data, self.velocity_data["Z"], label="dZ/dt")
self.ax.set_ylabel("Velocity [mm/s]")
elif self.mode == "acceleration":
self.ax.set_title("Live TCP Acceleration")
self.ax.plot(self.time_data, self.acceleration_data["X"], label="d²X/dt²")
self.ax.plot(self.time_data, self.acceleration_data["Y"], label="d²Y/dt²")
self.ax.plot(self.time_data, self.acceleration_data["Z"], label="d²Z/dt²")
self.ax.set_ylabel("Acceleration [mm/s²]")
elif self.mode == "joints":
self.ax.set_title("Live Joint Angles")
for j, values in self.joint_data.items():
self.ax.plot(self.time_data, values, label=j)
self.ax.set_ylabel("Angle [deg]")
elif self.mode == "force":
self.ax.set_title("Live Motor Currents")
for j, values in self.force_data.items():
self.ax.plot(self.time_data, values, label=j)
self.ax.set_ylabel("Current [Nm]")
self.ax.set_xlabel("Time")
self.ax.legend()
self.ax.grid(True)
self.fig.tight_layout()
def change_mode(self, mode):
self.mode = mode
self.ax = self.fig.add_subplot(111, projection="3d" if mode == "3d" else None)

View File

@ -1,16 +1,19 @@
from src.RSIPI.rsi_api import RSIAPI from src.RSIPI.rsi_api import RSIAPI
from src.RSIPI.live_plotter import LivePlotter
import math
import time import time
import threading
def main(): def main():
# Step 1: Create API instance
api = RSIAPI("RSI_EthernetConfig.xml") api = RSIAPI("RSI_EthernetConfig.xml")
time.sleep(10)
# Step 2: Start RSI connection
print("🔌 Starting RSI client...")
api.start_rsi()
time.sleep(10)
# Step 10: Stop RSI connection
print("🛑 Stopping RSI client...")
api.stop_rsi()
print("✅ All safety methods tested successfully.")
print(api.compare_test_runs("25-04-2025_16-33-47.csv", "25-04-2025_20-57-59.csv"))
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -1,81 +1,73 @@
import multiprocessing import multiprocessing
import socket import socket
import time
import csv
import logging import logging
import xml.etree.ElementTree as ET # ✅ FIX: Import ElementTree import xml.etree.ElementTree as ET
from .xml_handler import XMLGenerator from .xml_handler import XMLGenerator
from .safety_manager import SafetyManager from .safety_manager import SafetyManager
class NetworkProcess(multiprocessing.Process): class NetworkProcess(multiprocessing.Process):
"""Handles UDP communication and optional CSV logging in a separate process.""" """Handles UDP communication and optional CSV logging in a separate process."""
def __init__(self, ip, port, send_variables, receive_variables, stop_event, config_parser): def __init__(self, ip, port, send_variables, receive_variables, stop_event, config_parser, start_event):
super().__init__() super().__init__()
self.send_variables = send_variables self.send_variables = send_variables
self.receive_variables = receive_variables self.receive_variables = receive_variables
self.stop_event = stop_event self.stop_event = stop_event
self.start_event = start_event # ✅ NEW
self.config_parser = config_parser self.config_parser = config_parser
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.safety_manager = SafetyManager(config_parser.safety_limits) self.safety_manager = SafetyManager(config_parser.safety_limits)
self.client_address = (ip, port) self.client_address = (ip, port)
self.logging_active = multiprocessing.Value('b', False)
if not self.is_valid_ip(ip): self.log_filename = multiprocessing.Array('c', 256)
logging.warning(f"Invalid IP address '{ip}' detected. Falling back to '0.0.0.0'.")
print(f"⚠️ Invalid IP '{ip}', falling back to '0.0.0.0'.")
self.client_address = ('0.0.0.0', port)
else:
self.client_address = (ip, port)
self.logging_active = multiprocessing.Value('b', False) # Shared flag for logging
self.log_filename = multiprocessing.Array('c', 256) # Shared memory for filename
self.csv_process = None self.csv_process = None
self.controller_ip_and_port = None
def run(self):
"""Start the network loop."""
self.start_event.wait() # ✅ Wait until RSIClient sends start signal
try: try:
if not self.is_valid_ip(self.client_address[0]):
logging.warning(f"Invalid IP address '{self.client_address[0]}'. Falling back to '0.0.0.0'.")
self.client_address = ('0.0.0.0', self.client_address[1])
self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.udp_socket.bind(self.client_address) self.udp_socket.bind(self.client_address)
logging.info(f"✅ Network process initialized on {self.client_address}") logging.info(f"✅ Network process bound on {self.client_address}")
except OSError as e: except OSError as e:
logging.error(f"❌ Failed to bind to {self.client_address}: {e}") logging.error(f"❌ Failed to bind to {self.client_address}: {e}")
raise raise
self.controller_ip_and_port = None
@staticmethod
def is_valid_ip(ip):
"""Check if an IP address is valid and can be bound on this machine."""
try:
socket.inet_aton(ip) # Validate format
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.bind((ip, 0)) # Try binding
return True
except (socket.error, OSError):
return False
def run(self):
"""Start the network loop."""
print("[DEBUG] Network process started.")
while not self.stop_event.is_set(): while not self.stop_event.is_set():
try: try:
self.udp_socket.settimeout(5) self.udp_socket.settimeout(5)
data_received, self.controller_ip_and_port = self.udp_socket.recvfrom(1024) data_received, self.controller_ip_and_port = self.udp_socket.recvfrom(1024)
#print("Receive: ", data_received)
message = data_received.decode() message = data_received.decode()
self.process_received_data(message) self.process_received_data(message)
#print("Network :", self.send_variables)
send_xml = XMLGenerator.generate_send_xml(self.send_variables, self.config_parser.network_settings) send_xml = XMLGenerator.generate_send_xml(self.send_variables, self.config_parser.network_settings)
#print("Send:", send_xml)
self.udp_socket.sendto(send_xml.encode(), self.controller_ip_and_port) self.udp_socket.sendto(send_xml.encode(), self.controller_ip_and_port)
# ✅ If logging is active, write data to CSV
if self.logging_active.value: if self.logging_active.value:
self.log_to_csv() self.log_to_csv()
except socket.timeout: except socket.timeout:
print("[WARNING] No message received within timeout period.") logging.error("[WARNING] No message received within timeout period.")
except Exception as e: except Exception as e:
print(f"[ERROR] Network process error: {e}") logging.error(f"[ERROR] Network process error: {e}")
@staticmethod
def is_valid_ip(ip):
try:
socket.inet_aton(ip)
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.bind((ip, 0))
return True
except (socket.error, OSError):
return False
def process_received_data(self, xml_string): def process_received_data(self, xml_string):
try: try:
@ -86,87 +78,9 @@ class NetworkProcess(multiprocessing.Process):
self.receive_variables[element.tag] = {k: float(v) for k, v in element.attrib.items()} self.receive_variables[element.tag] = {k: float(v) for k, v in element.attrib.items()}
else: else:
self.receive_variables[element.tag] = element.text self.receive_variables[element.tag] = element.text
# specifically capture IPOC from received message
if element.tag == "IPOC": if element.tag == "IPOC":
received_ipoc = int(element.text) received_ipoc = int(element.text)
self.receive_variables["IPOC"] = received_ipoc self.receive_variables["IPOC"] = received_ipoc
self.send_variables["IPOC"] = received_ipoc + 4 # Increment by 4 ms self.send_variables["IPOC"] = received_ipoc + 4
except Exception as e: except Exception as e:
print(f"[ERROR] Error parsing received message: {e}") logging.error(f"[ERROR] Error parsing received message: {e}")
def log_to_csv(self):
"""Write send/receive variables to the CSV log with safety flags."""
filename = self.log_filename.value.decode().strip()
if not filename:
return
try:
with open(filename, mode="a", newline="") as file:
writer = csv.writer(file)
# Write header if new
if file.tell() == 0:
headers = ["Timestamp", "IPOC"]
for k, v in self.send_variables.items():
if isinstance(v, dict):
headers += [f"Send.{k}.{subk}" for subk in v.keys()]
else:
headers.append(f"Send.{k}")
for k, v in self.receive_variables.items():
if isinstance(v, dict):
headers += [f"Receive.{k}.{subk}" for subk in v.keys()]
else:
headers.append(f"Receive.{k}")
headers.append("SafetyViolation")
writer.writerow(headers)
# Gather values safely
timestamp = time.strftime("%d-%m-%Y %H:%M:%S")
ipoc = self.receive_variables.get("IPOC", 0)
send_data = []
for k, v in self.send_variables.items():
if isinstance(v, dict):
send_data.extend([v.get(subk, "") for subk in v])
else:
send_data.append(v)
receive_data = []
for k, v in self.receive_variables.items():
if isinstance(v, dict):
receive_data.extend([v.get(subk, "") for subk in v])
else:
receive_data.append(v)
# Safety check
violation = "False"
try:
for var, val in self.send_variables.items():
if isinstance(val, dict):
for sub, subval in val.items():
path = f"{var}.{sub}"
self.safety_manager.validate(path, subval)
else:
self.safety_manager.validate(var, val)
except Exception as e:
violation = str(e)
writer.writerow([timestamp, ipoc] + send_data + receive_data + [violation])
except Exception as e:
print(f"[ERROR] Failed to log data to CSV: {e}")
def start_logging(self, filename):
"""Start logging RSI data to CSV."""
self.logging_active.value = True
self.log_filename.value = filename.encode()
print(f"✅ CSV Logging started: {filename}")
def stop_logging(self):
"""Stop logging RSI data."""
self.logging_active.value = False
print("🛑 CSV Logging stopped.")
def is_logging_active(self):
"""Return logging status."""
return self.logging_active.value

View File

@ -1,8 +1,9 @@
import logging
import pandas as pd import pandas as pd
import numpy as np import numpy as np
import json import json
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from .rsi_client import RSIClient
from .kuka_visualiser import KukaRSIVisualiser from .kuka_visualiser import KukaRSIVisualiser
from .krl_to_csv_parser import KRLParser from .krl_to_csv_parser import KRLParser
from .inject_rsi_to_krl import inject_rsi_to_krl from .inject_rsi_to_krl import inject_rsi_to_krl
@ -22,14 +23,16 @@ class RSIAPI:
"""Initialize RSIAPI with an RSI client instance.""" """Initialize RSIAPI with an RSI client instance."""
self.thread = None self.thread = None
self.config_file = config_file self.config_file = config_file
self.client = None # Delay instantiation self.client = None
self.graph_process = None # Store graphing process self.graph_process = None
self.graphing_instance = None self.graphing_instance = None
self.graph_thread = None# self.graph_thread = None#
self.trajectory_queue = [] self.trajectory_queue = []
self.live_plotter = None self.live_plotter = None
self.live_plot_thread = None self.live_plot_thread = None
self._ensure_client()
def _ensure_client(self): def _ensure_client(self):
"""Ensure RSIClient is initialised before use.""" """Ensure RSIClient is initialised before use."""
if self.client is None: if self.client is None:
@ -37,15 +40,15 @@ class RSIAPI:
self.client = RSIClient(self.config_file) self.client = RSIClient(self.config_file)
def start_rsi(self): def start_rsi(self):
self._ensure_client()
self.thread = threading.Thread(target=self.client.start, daemon=True) self.thread = threading.Thread(target=self.client.start, daemon=True)
self.thread.start() self.thread.start()
return "RSI started in background." return "RSI started in background."
def stop_rsi(self): def stop_rsi(self):
"""Stop the RSI client.""" """Stop the RSI client."""
self.client.stop() self.client.stop()
return "RSI stopped." return "RSI stopped."
def generate_report(filename, format_type): def generate_report(filename, format_type):
""" """
@ -89,9 +92,9 @@ class RSIAPI:
plt.tight_layout() plt.tight_layout()
plt.savefig(output_path) plt.savefig(output_path)
else: else:
raise ValueError(f"Unsupported format: {format_type}") raise ValueError(f"Unsupported format: {format_type}")
return f"Report saved as {output_path}" return f"Report saved as {output_path}"
def update_variable(self, name, value): def update_variable(self, name, value):
if "." in name: if "." in name:
@ -103,13 +106,13 @@ class RSIAPI:
safe_value = self.client.safety_manager.validate(full_path, float(value)) safe_value = self.client.safety_manager.validate(full_path, float(value))
current[child] = safe_value current[child] = safe_value
self.client.send_variables[parent] = current self.client.send_variables[parent] = current
return f"Updated {name} to {safe_value}" return f"Updated {name} to {safe_value}"
else: else:
raise KeyError(f"Parent variable '{parent}' not found in send_variables") raise KeyError(f"Parent variable '{parent}' not found in send_variables")
else: else:
safe_value = self.client.safety_manager.validate(name, float(value)) safe_value = self.client.safety_manager.validate(name, float(value))
self.client.send_variables[name] = safe_value self.client.send_variables[name] = safe_value
return f"Updated {name} to {safe_value}" return f"Updated {name} to {safe_value}"
def show_variables(self): def show_variables(self):
"""Print available variable names in send and receive variables.""" """Print available variable names in send and receive variables."""
@ -169,7 +172,7 @@ class RSIAPI:
def reconnect(self): def reconnect(self):
"""Restart the network connection without stopping RSI.""" """Restart the network connection without stopping RSI."""
self.client.reconnect() self.client.reconnect()
return "Network connection restarted." return "Network connection restarted."
def toggle_digital_io(self, io_group, io_name, state): def toggle_digital_io(self, io_group, io_name, state):
""" """
@ -246,7 +249,7 @@ class RSIAPI:
str: Status message indicating plot success or failure. str: Status message indicating plot success or failure.
""" """
if not os.path.exists(csv_path): if not os.path.exists(csv_path):
return f"CSV file not found: {csv_path}" return f"CSV file not found: {csv_path}"
try: try:
plot_type = plot_type.lower() plot_type = plot_type.lower()
@ -272,20 +275,20 @@ class RSIAPI:
StaticPlotter.plot_motor_currents(csv_path) StaticPlotter.plot_motor_currents(csv_path)
case "deviation": case "deviation":
if overlay_path is None or not os.path.exists(overlay_path): if overlay_path is None or not os.path.exists(overlay_path):
return "Deviation plot requires a valid overlay CSV file." return "Deviation plot requires a valid overlay CSV file."
StaticPlotter.plot_deviation(csv_path, overlay_path) StaticPlotter.plot_deviation(csv_path, overlay_path)
case _: case _:
return f"Invalid plot type '{plot_type}'. Use one of: 3d, 2d_xy, 2d_xz, 2d_yz, position, velocity, acceleration, joints, force, deviation." return f"Invalid plot type '{plot_type}'. Use one of: 3d, 2d_xy, 2d_xz, 2d_yz, position, velocity, acceleration, joints, force, deviation."
return f"✅ Plot '{plot_type}' generated successfully." return f"✅ Plot '{plot_type}' generated successfully."
except Exception as e: except Exception as e:
return f"Failed to generate plot '{plot_type}': {str(e)}" return f"Failed to generate plot '{plot_type}': {str(e)}"
def start_live_plot(self, mode="3d", interval=100): def start_live_plot(self, mode="3d", interval=100):
if self.live_plotter and self.live_plotter.running: if self.live_plotter and self.live_plotter.running:
return "⚠️ Live plotting already active." return "Live plotting already active."
def runner(): def runner():
self.live_plotter = LivePlotter(self.client, mode=mode, interval=interval) self.live_plotter = LivePlotter(self.client, mode=mode, interval=interval)
@ -293,19 +296,19 @@ class RSIAPI:
self.live_plot_thread = Thread(target=runner, daemon=True) self.live_plot_thread = Thread(target=runner, daemon=True)
self.live_plot_thread.start() self.live_plot_thread.start()
return f"📈 Live plot started in '{mode}' mode at {interval}ms interval." return f"Live plot started in '{mode}' mode at {interval}ms interval."
def stop_live_plot(self): def stop_live_plot(self):
if self.live_plotter and self.live_plotter.running: if self.live_plotter and self.live_plotter.running:
self.live_plotter.stop() self.live_plotter.stop()
return "🛑 Live plotting stopped." return "Live plotting stopped."
return "⚠️ No live plot is currently running." return "No live plot is currently running."
def change_live_plot_mode(self, mode): def change_live_plot_mode(self, mode):
if self.live_plotter and self.live_plotter.running: if self.live_plotter and self.live_plotter.running:
self.live_plotter.change_mode(mode) self.live_plotter.change_mode(mode)
return f"🔄 Live plot mode changed to '{mode}'." return f"Live plot mode changed to '{mode}'."
return "⚠️ No live plot is active to change mode." return "No live plot is active to change mode."
@ -313,7 +316,7 @@ class RSIAPI:
def enable_alerts(self, enable): def enable_alerts(self, enable):
"""Enable or disable real-time alerts.""" """Enable or disable real-time alerts."""
self.client.enable_alerts(enable) self.client.enable_alerts(enable)
return f"Alerts {'enabled' if enable else 'disabled'}." return f"Alerts {'enabled' if enable else 'disabled'}."
def override_safety(self, enabled: bool): def override_safety(self, enabled: bool):
self.client.safety_manager.override_safety(enabled) self.client.safety_manager.override_safety(enabled)
@ -325,8 +328,8 @@ class RSIAPI:
"""Set threshold for deviation or force alerts.""" """Set threshold for deviation or force alerts."""
if alert_type in ["deviation", "force"]: if alert_type in ["deviation", "force"]:
self.client.set_alert_threshold(alert_type, value) self.client.set_alert_threshold(alert_type, value)
return f"{alert_type.capitalize()} alert threshold set to {value}" return f"{alert_type.capitalize()} alert threshold set to {value}"
return "Invalid alert type. Use 'deviation' or 'force'." return "Invalid alert type. Use 'deviation' or 'force'."
@staticmethod @staticmethod
def visualise_csv_log(csv_file, export=False): def visualise_csv_log(csv_file, export=False):
@ -360,9 +363,9 @@ class RSIAPI:
parser.parse_src() parser.parse_src()
parser.parse_dat() parser.parse_dat()
parser.export_csv(output_file) parser.export_csv(output_file)
return f"KRL data successfully exported to {output_file}" return f"KRL data successfully exported to {output_file}"
except Exception as e: except Exception as e:
return f"Error parsing KRL files: {e}" return f"Error parsing KRL files: {e}"
@staticmethod @staticmethod
def inject_rsi(input_krl, output_krl=None, rsi_config="RSIGatewayv1.rsi"): def inject_rsi(input_krl, output_krl=None, rsi_config="RSIGatewayv1.rsi"):
@ -377,9 +380,9 @@ class RSIAPI:
try: try:
inject_rsi_to_krl(input_krl, output_krl, rsi_config) inject_rsi_to_krl(input_krl, output_krl, rsi_config)
output_path = output_krl if output_krl else input_krl output_path = output_krl if output_krl else input_krl
return f"RSI successfully injected into {output_path}" return f"RSI successfully injected into {output_path}"
except Exception as e: except Exception as e:
return f"RSI injection failed: {e}" return f"RSI injection failed: {e}"
@staticmethod @staticmethod
def generate_trajectory(start, end, steps=100, space="cartesian", mode="absolute", include_resets=False): def generate_trajectory(start, end, steps=100, space="cartesian", mode="absolute", include_resets=False):
@ -403,7 +406,7 @@ class RSIAPI:
self.update_joints(**point) self.update_joints(**point)
else: else:
raise ValueError("space must be 'cartesian' or 'joint'") raise ValueError("space must be 'cartesian' or 'joint'")
print(f"🚀 Step {idx + 1}/{len(trajectory)} sent") print(f"Step {idx + 1}/{len(trajectory)} sent")
await asyncio.sleep(rate) await asyncio.sleep(rate)
try: try:
@ -453,7 +456,7 @@ class RSIAPI:
df = pd.DataFrame(data) df = pd.DataFrame(data)
df.to_csv(filename, index=False) df.to_csv(filename, index=False)
return f"Movement data exported to {filename}" return f"Movement data exported to {filename}"
@staticmethod @staticmethod
def compare_test_runs(file1, file2): def compare_test_runs(file1, file2):
@ -484,7 +487,7 @@ class RSIAPI:
""" """
self._ensure_client() self._ensure_client()
if "RKorr" not in self.client.send_variables: if "RKorr" not in self.client.send_variables:
print("⚠️ Warning: RKorr not configured in send_variables. Skipping Cartesian update.") logging.warning("Warning: RKorr not configured in send_variables. Skipping Cartesian update.")
return return
for axis, value in kwargs.items(): for axis, value in kwargs.items():
@ -496,7 +499,7 @@ class RSIAPI:
""" """
self._ensure_client() self._ensure_client()
if "AKorr" not in self.client.send_variables: if "AKorr" not in self.client.send_variables:
print("⚠️ Warning: AKorr not configured in send_variables. Skipping Joint update.") logging.warning("⚠️ Warning: AKorr not configured in send_variables. Skipping Joint update.")
return return
for axis, value in kwargs.items(): for axis, value in kwargs.items():
@ -510,7 +513,7 @@ class RSIAPI:
import time import time
import datetime import datetime
print("📡 Watching network... Press Ctrl+C to stop.\n") logging.info("Watching network... Press Ctrl+C to stop.\n")
start_time = time.time() start_time = time.time()
try: try:
@ -525,9 +528,7 @@ class RSIAPI:
break break
except KeyboardInterrupt: except KeyboardInterrupt:
print("\n🛑 Stopped network watch.") logging.info("\nStopped network watch.")
# --- 🤖 High-level Cartesian and Joint Trajectory Movement ---
def move_cartesian_trajectory(self, start_pose, end_pose, steps=50, rate=0.012): def move_cartesian_trajectory(self, start_pose, end_pose, steps=50, rate=0.012):
""" """
@ -558,11 +559,11 @@ class RSIAPI:
Generate and queue a Cartesian movement (no execution). Generate and queue a Cartesian movement (no execution).
""" """
if not isinstance(start_pose, dict) or not isinstance(end_pose, dict): if not isinstance(start_pose, dict) or not isinstance(end_pose, dict):
raise ValueError("start_pose and end_pose must be dictionaries (e.g., {'X': 0, 'Y': 0, 'Z': 500})") raise ValueError("start_pose and end_pose must be dictionaries (e.g., {'X': 0, 'Y': 0, 'Z': 500})")
if steps <= 0: if steps <= 0:
raise ValueError("Steps must be greater than zero.") raise ValueError("Steps must be greater than zero.")
if rate <= 0: if rate <= 0:
raise ValueError("Rate must be greater than zero.") raise ValueError("Rate must be greater than zero.")
trajectory = self.generate_trajectory(start_pose, end_pose, steps=steps, space="cartesian") trajectory = self.generate_trajectory(start_pose, end_pose, steps=steps, space="cartesian")
self.queue_trajectory(trajectory, "cartesian", rate) self.queue_trajectory(trajectory, "cartesian", rate)
@ -572,11 +573,11 @@ class RSIAPI:
Generate and queue a Joint-space movement (no execution). Generate and queue a Joint-space movement (no execution).
""" """
if not isinstance(start_joints, dict) or not isinstance(end_joints, dict): if not isinstance(start_joints, dict) or not isinstance(end_joints, dict):
raise ValueError("start_joints and end_joints must be dictionaries (e.g., {'A1': 0, 'A2': 0})") raise ValueError("start_joints and end_joints must be dictionaries (e.g., {'A1': 0, 'A2': 0})")
if steps <= 0: if steps <= 0:
raise ValueError("Steps must be greater than zero.") raise ValueError("Steps must be greater than zero.")
if rate <= 0: if rate <= 0:
raise ValueError("Rate must be greater than zero.") raise ValueError("Rate must be greater than zero.")
trajectory = self.generate_trajectory(start_joints, end_joints, steps=steps, space="joint") trajectory = self.generate_trajectory(start_joints, end_joints, steps=steps, space="joint")
self.queue_trajectory(trajectory, "joint", rate) self.queue_trajectory(trajectory, "joint", rate)

View File

@ -1,375 +1,196 @@
from .rsi_api import RSIAPI from RSIPI.rsi_api import RSIAPI
from .kuka_visualiser import KukaRSIVisualiser
from .krl_to_csv_parser import KRLParser
from .inject_rsi_to_krl import inject_rsi_to_krl
class RSICommandLineInterface: class RSICommandLineInterface:
"""Command-Line Interface for controlling RSI Client.""" """Command-Line Interface for controlling RSI Client."""
def __init__(self, input_config_file): def __init__(self, input_config_file):
"""Initialize CLI with an RSI API instance."""
self.client = RSIAPI(input_config_file) self.client = RSIAPI(input_config_file)
self.running = True self.running = True
def run(self): def run(self):
"""Starts the CLI interaction loop.""" print("RSI Command-Line Interface Started. Type 'help' for commands.")
print("✅ RSI Command-Line Interface Started. Type 'help' for commands.")
while self.running: while self.running:
command = input("RSI> ").strip().lower() try:
self.process_command(command) command = input("RSI> ").strip()
self.process_command(command)
except KeyboardInterrupt:
self.exit()
def process_command(self, command): def process_command(self, command):
"""Processes user input commands."""
parts = command.split() parts = command.split()
if not parts: if not parts:
return return
cmd = parts[0] cmd = parts[0].lower()
args = parts[1:]
if cmd == "start":
self.client.start_rsi()
elif cmd == "stop":
self.client.stop_rsi()
elif cmd == "set" and len(parts) >= 3:
variable, value = parts[1], " ".join(parts[2:])
self.client.update_variable(variable, value)
elif cmd == "alerts" and len(parts) == 2:
self.toggle_alerts(parts[1])
elif cmd == "set_alert_threshold" and len(parts) == 3:
self.set_alert_threshold(parts[1], parts[2])
elif cmd == "show":
if len(parts) == 0:
group = "all"
else:
group = parts[0]
self.client.show_variables(group)
elif cmd == "ipoc":
ipoc = self.client.get_ipoc()
print(f"🛰 Current IPOC: {ipoc}")
elif cmd == "watch":
duration = float(parts[0]) if parts else None
self.client.watch_network(duration=duration)
elif cmd == "reset":
self.client.reset_variables()
elif cmd == "status":
self.client.show_config_file()
elif cmd == "reconnect":
self.client.reconnect()
elif cmd == "toggle" and len(parts) == 3:
self.client.toggle_digital_io(parts[1], parts[2])
elif cmd == "move_external" and len(parts) == 3:
self.client.move_external_axis(parts[1], parts[2])
elif cmd == "correct" and len(parts) == 4:
self.client.correct_position(parts[1], parts[2], parts[3])
elif cmd == "speed" and len(parts) == 3:
self.client.adjust_speed(parts[1], parts[2])
elif cmd == "override" and len(parts) == 2:
state = parts[1]
if state in ["on", "true", "1"]:
self.client.override_safety(True)
print("🛡️ Safety override ENABLED.")
elif state in ["off", "false", "0"]:
self.client.override_safety(False)
print("🛡️ Safety override DISABLED.")
else:
print("❌ Usage: override on | off")
elif cmd == "log":
if len(parts) < 1:
print("⚠️ Usage: log start|stop|status")
return
subcmd = parts[0].lower()
if subcmd == "start":
filename = self.client.start_logging()
print(f"✅ Logging started → {filename}")
elif subcmd == "stop":
self.client.stop_logging()
print("🛑 Logging stopped.")
elif subcmd == "status":
status = self.client.is_logging_active()
print("📊 Logging is currently", "ACTIVE ✅" if status else "INACTIVE ❌")
else:
print("⚠️ Unknown log subcommand. Use: start, stop, status")
elif cmd == "graph":
if len(parts) == 0:
print("⚠️ Usage: graph show <file> | graph compare <file1> <file2>")
return
sub = parts[0].lower()
if sub == "show" and len(parts) == 2:
self.client.visualise_csv_log(parts[1])
elif sub == "compare" and len(parts) == 3:
self.client.compare_test_runs(parts[1], parts[2])
else:
print("⚠️ Usage:\n graph show <file>\n graph compare <file1> <file2>")
elif cmd == "export" and len(parts) == 2:
self.export_data(parts[1])
elif cmd == "compare" and len(parts) == 3:
self.compare_test_runs(parts[1], parts[2])
elif cmd == "report" and len(parts) >= 3:
self.generate_report(parts[1], parts[2])
elif cmd == "exit":
self.client.stop_rsi()
self.running = False
elif cmd == "help":
self.show_help()
elif cmd == "visualize" and len(parts) >= 2:
csv_file = parts[1]
export = ("export" in parts)
self.visualize(csv_file, export)
elif cmd == "krlparse" and len(parts) == 4:
src_file, dat_file, output_file = parts[1], parts[2], parts[3]
self.krl_parse(src_file, dat_file, output_file)
elif cmd == "inject_rsi" and len(parts) >= 2:
input_krl = parts[1]
output_krl = parts[2] if len(parts) >= 3 else None
rsi_config = parts[3] if len(parts) == 4 else "RSIGatewayv1.rsi"
self.inject_rsi(input_krl, output_krl, rsi_config)
elif cmd == "show" and len(parts) == 2 and parts[1] == "all":
variables = self.client.show_variables()
print("📤 Send Variables:")
for k, v in variables["send_variables"].items():
print(f" {k}: {v}")
print("📥 Receive Variables:")
for k, v in variables["receive_variables"].items():
print(f" {k}: {v}")
elif cmd == "show" and len(parts) == 2 and parts[1] == "live":
data = self.client.get_live_data()
print("📡 Live Data:")
for k, v in data.items():
print(f" {k}: {v}")
elif cmd == "log" and len(parts) == 2 and parts[1] == "status":
active = self.client.is_logging_active()
print(f"📋 Logging is {'ACTIVE' if active else 'INACTIVE'}")
elif cmd == "move_cartesian" and len(parts) >= 3:
start_dict = self.parse_pose_string(parts[1])
end_dict = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", default=50, cast_type=int)
rate = self.extract_optional_value(parts, "rate", default=0.04, cast_type=float)
trajectory = self.client.generate_trajectory(start_dict, end_dict, steps=steps, space="cartesian")
self.client.execute_trajectory(trajectory, space="cartesian", rate=rate)
elif cmd == "move_joint" and len(parts) >= 3:
start_dict = self.parse_pose_string(parts[1])
end_dict = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", default=50, cast_type=int)
rate = self.extract_optional_value(parts, "rate", default=0.4, cast_type=float)
trajectory = self.client.generate_trajectory(start_dict, end_dict, steps=steps, space="joint")
self.client.execute_trajectory(trajectory, space="joint", rate=rate)
elif cmd == "queue_cartesian" and len(parts) >= 3:
start = self.parse_pose_string(parts[1])
end = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", 50, int)
rate = self.extract_optional_value(parts, "rate", 0.04, float)
traj = self.client.generate_trajectory(start, end, steps, "cartesian")
self.client.queue_trajectory(traj, "cartesian", rate)
elif cmd == "queue_joint" and len(parts) >= 3:
start = self.parse_pose_string(parts[1])
end = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", 50, int)
rate = self.extract_optional_value(parts, "rate", 0.04, float)
traj = self.client.generate_trajectory(start, end, steps, "joint")
self.client.queue_trajectory(traj, "joint", rate)
elif cmd == "execute_queue":
self.client.execute_queued_trajectories()
elif cmd == "clear_queue":
self.client.clear_trajectory_queue()
elif cmd == "show_queue":
queue = self.client.get_trajectory_queue()
print(f"🧾 Trajectory Queue: {len(queue)} items")
for i, q in enumerate(queue):
print(f" {i + 1}. {q['space']} | {q['steps']} steps | {q['rate']}s")
elif cmd == "export_movement_data" and len(parts) == 2:
result = self.client.export_movement_data(parts[1])
print(result)
elif cmd == "compare_test_runs" and len(parts) == 3:
result = self.client.compare_test_runs(parts[1], parts[2])
print("📊 Comparison Results:")
for key, stats in result.items():
print(f"{key}: mean_diff={stats['mean_diff']:.3f}, max_diff={stats['max_diff']:.3f}")
elif cmd == "generate_report" and len(parts) in [2, 3]:
output = parts[2] if len(parts) == 3 else "report.txt"
result = generate_report(parts[1], output)
print(result)
elif cmd == "safety-stop":
self.client.safety_manager.emergency_stop()
print("🛑 Safety: Emergency Stop activated.")
elif cmd == "safety-reset":
self.client.safety_manager.reset_stop()
print("✅ Safety: Emergency Stop reset. Motion allowed.")
elif cmd == "safety-status":
sm = self.client.safety_manager
override_active = self.client.is_safety_overridden()
print("🛡️ Safety System Status:")
print(" 🧱 Emergency Stop:", "ACTIVE 🛑" if sm.is_stopped() else "CLEARED ✅")
print(" 🛡️ Safety Override:", "ENABLED" if override_active else "DISABLED")
print(" 📏 Enforced Limits:")
for var, (lo, hi) in sm.get_limits().items():
print(f" - {var}: {lo}{hi}")
elif cmd == "plot" and len(parts) >= 2:
plot_type = parts[1]
if len(parts) < 3:
print("⚠️ Usage: plot <type> <csv_path> [overlay_path]")
return
csv_path = parts[2]
overlay_path = parts[3] if len(parts) >= 4 else None
result = self.client.generate_plot(csv_path, plot_type=plot_type, overlay_path=overlay_path)
print(result)
elif cmd == "safety-set-limit" and len(parts) == 4:
var, lo, hi = parts[1], parts[2], parts[3]
try:
lo = float(lo)
hi = float(hi)
self.client.safety_manager.set_limit(var, lo, hi)
print(f"✅ Set limit for {var}: {lo} to {hi}")
except ValueError:
print("❌ Invalid numbers for limit. Usage: safety-set-limit RKorr.X -5 5")
else:
print("❌ Unknown command. Type 'help' for a list of commands.")
def toggle_alerts(self, state):
"""Enable or disable real-time alerts."""
if state == "on":
self.client.enable_alerts(True)
print("✅ Real-time alerts enabled.")
elif state == "off":
self.client.enable_alerts(False)
print("✅ Real-time alerts disabled.")
else:
print("❌ Invalid option. Use 'alerts on' or 'alerts off'.")
def set_alert_threshold(self, alert_type, value):
"""Set thresholds for deviation or force alerts."""
try: try:
value = float(value) match cmd:
if alert_type in ["deviation", "force"]: case "start":
self.client.set_alert_threshold(alert_type, value) print(self.client.start_rsi())
print(f"{alert_type.capitalize()} alert threshold set to {value}") case "stop":
else: print(self.client.stop_rsi())
print("❌ Invalid alert type. Use 'deviation' or 'force'.") case "exit":
except ValueError: self.exit()
print("❌ Invalid threshold value. Enter a numeric value.") case "set":
var, val = args[0], args[1]
def export_data(self, filename): print(self.client.update_variable(var, val))
"""Export movement data to a CSV file.""" case "show":
self.client.export_movement_data(filename) print("📤 Send Variables:")
print(f"✅ Data exported to {filename}") self.client.show_variables()
case "reset":
def compare_test_runs(self, file1, file2): print(self.client.reset_variables())
"""Compare two test runs from CSV files.""" case "status":
result = self.client.compare_test_runs(file1, file2) print(self.client.show_config_file())
print(result) case "ipoc":
print(f"🛰 IPOC: {self.client.get_ipoc()}")
def generate_report(self, filename, format_type): case "watch":
"""Generate a statistical report from movement data.""" duration = float(args[0]) if args else None
if format_type not in ["csv", "json", "pdf"]: self.client.watch_network(duration)
print("❌ Invalid format. Use 'csv', 'json', or 'pdf'.") case "reconnect":
return print(self.client.reconnect())
self.client.generate_report(filename, format_type) case "alerts":
print(f"✅ Report generated: {filename}.{format_type}") state = args[0].lower()
self.client.enable_alerts(state == "on")
case "set_alert_threshold":
def show_help(self): alert_type, value = args[0], float(args[1])
"""Displays the list of available commands.""" self.client.set_alert_threshold(alert_type, value)
print(""" case "toggle":
Available Commands: group, name, value = args
start, stop, exit print(self.client.toggle_digital_io(group, name, value))
set <var> <value>, show, ipoc, watch, reset, status, reconnect case "move_external":
toggle <DiO/DiL> <0/1>, move_external <axis> <value> axis, value = args
correct <RKorr/AKorr> <X/Y/Z/A/B/C> <value>, speed <Tech.TX> <value> print(self.client.move_external_axis(axis, value))
override <limit> case "correct":
log start <file>.csv, log stop, log status corr_type, axis, value = args
graph start <mode>, graph stop, graph mode <position|velocity|acceleration|force> print(self.client.correct_position(corr_type, axis, value))
graph overlay on/off, graph load_plan <file> case "speed":
export <filename.csv> tech_param, value = args
compare <file1.csv> <file2.csv> print(self.client.adjust_speed(tech_param, value))
report <filename> <csv|json|pdf> case "override":
alerts on/off state = args[0]
set_alert_threshold <deviation|force> <value> self.client.override_safety(state in ["on", "true", "1"])
show all - Show all current send and receive variables case "log":
show live - Show real-time TCP, force, and IPOC values subcmd = args[0]
log status - Display whether logging is currently active if subcmd == "start":
move_cartesian <start> <end> [steps=50] [rate=0.012] print(f"✅ Logging to {self.client.start_logging()}")
e.g., X=0,Y=0,Z=500 A=100,Y=0,Z=500 steps=100 rate=0.012 elif subcmd == "stop":
move_joint <start> <end> [steps=50] [rate=0.012] print(self.client.stop_logging())
e.g., A1=0,... A1=90,... steps=60 elif subcmd == "status":
queue_cartesian <start> <end> [steps=50] [rate=0.012] - Queue linear Cartesian trajectory print("📋", "ACTIVE" if self.client.is_logging_active() else "INACTIVE")
queue_joint <start> <end> [steps=50] [rate=0.012] - Queue linear Joint trajectory case "graph":
show_queue - Show queued trajectory segments sub = args[0]
clear_queue - Clear all queued trajectories if sub == "show":
execute_queue - Execute all queued motions self.client.visualise_csv_log(args[1])
export_movement_data <csvfile> - Export logged motion data to CSV elif sub == "compare":
compare_test_runs <file1> <file2> - Compare 2 test logs (e.g. deviation) print(self.client.compare_test_runs(args[1], args[2]))
generate_report <input.csv> [out.txt] - Create a movement analysis report case "plot":
safety-stop - Emergency stop: block motion plot_type, csv_path = args[0], args[1]
safety-reset - Reset emergency stop overlay = args[2] if len(args) > 2 else None
safety-status - Show safety and override status print(self.client.generate_plot(csv_path, plot_type, overlay))
override on/off - Enable or disable safety override case "move_cartesian":
alerts on/off start = self.parse_pose(args[0])
set_alert_threshold <deviation|force> <value> end = self.parse_pose(args[1])
""") steps = self.extract_value(args, "steps", 50, int)
rate = self.extract_value(args, "rate", 0.04, float)
def visualise(self, csv_file, export=False): self.client.move_cartesian_trajectory(start, end, steps, rate)
try: case "move_joint":
visualiser = KukaRSIVisualiser(csv_file) start = self.parse_pose(args[0])
visualiser.plot_trajectory() end = self.parse_pose(args[1])
visualiser.plot_joint_positions() steps = self.extract_value(args, "steps", 50, int)
visualiser.plot_force_trends() rate = self.extract_value(args, "rate", 0.04, float)
self.client.move_joint_trajectory(start, end, steps, rate)
if export: case "queue_cartesian":
visualiser.export_graphs() start = self.parse_pose(args[0])
print(f"✅ Visualisations exported for '{csv_file}'") end = self.parse_pose(args[1])
steps = self.extract_value(args, "steps", 50, int)
rate = self.extract_value(args, "rate", 0.04, float)
self.client.queue_cartesian_trajectory(start, end, steps, rate)
case "queue_joint":
start = self.parse_pose(args[0])
end = self.parse_pose(args[1])
steps = self.extract_value(args, "steps", 50, int)
rate = self.extract_value(args, "rate", 0.04, float)
self.client.queue_joint_trajectory(start, end, steps, rate)
case "execute_queue":
self.client.execute_queued_trajectories()
case "clear_queue":
self.client.clear_trajectory_queue()
case "show_queue":
print(self.client.get_trajectory_queue())
case "export_movement_data":
print(self.client.export_movement_data(args[0]))
case "compare_test_runs":
print(self.client.compare_test_runs(args[0], args[1]))
case "generate_report":
print(self.client.generate_report(args[0], args[1]))
case "safety-stop":
self.client.safety_stop()
case "safety-reset":
self.client.safety_reset()
case "safety-status":
print(self.client.safety_status())
case "safety-set-limit":
var, lo, hi = args
self.client.safety_set_limit(var, lo, hi)
case "krlparse":
self.client.parse_krl_to_csv(args[0], args[1], args[2])
case "inject_rsi":
input_krl = args[0]
output_krl = args[1] if len(args) > 1 else None
rsi_cfg = args[2] if len(args) > 2 else "RSIGatewayv1.rsi"
self.client.inject_rsi(input_krl, output_krl, rsi_cfg)
case "visualize":
self.client.visualise_csv_log(args[0], export="export" in args)
case "help":
self.show_help()
case _:
print("❌ Unknown command. Type 'help'.")
except Exception as e: except Exception as e:
print(f"❌ Failed to visualize '{csv_file}': {e}") print(f"❌ Error: {e}")
def krl_parse(self, src_file, dat_file, output_file): def parse_pose(self, pose_string):
"""CLI method to parse KRL files and output CSV.""" return dict(item.split("=") for item in pose_string.split(","))
try:
parser = KRLParser(src_file, dat_file)
parser.parse_src()
parser.parse_dat()
parser.export_csv(output_file)
print(f"✅ KRL files parsed successfully. Output CSV: {output_file}")
except Exception as e:
print(f"❌ Failed to parse KRL files: {e}")
def inject_rsi(self, input_krl, output_krl=None, rsi_config="RSIGatewayv1.rsi"): def extract_value(self, args, key, default, cast_type):
"""Inject RSI commands into a KRL file via CLI.""" for arg in args[2:]:
try: if arg.startswith(f"{key}="):
inject_rsi_to_krl(input_krl, output_krl, rsi_config)
output_path = output_krl if output_krl else input_krl
print(f"✅ RSI commands successfully injected into '{output_path}'")
except Exception as e:
print(f"❌ Failed to inject RSI commands: {e}")
def extract_optional_value(self, parts, key, default=0, cast_type=float):
"""
Extracts optional arguments like 'steps=100' or 'rate=0.01'
"""
for part in parts[3:]: # skip cmd, start, end
if part.startswith(f"{key}="):
try: try:
return cast_type(part.split("=")[1]) return cast_type(arg.split("=")[1])
except ValueError: except ValueError:
return default return default
return default return default
def exit(self):
print("🛑 Exiting RSI CLI...")
self.client.stop_rsi()
self.running = False
def show_help(self):
print("""
Available Commands:
start, stop, exit
set <var> <value>
show, status, ipoc, watch, reset, reconnect
alerts on/off, set_alert_threshold <type> <value>
toggle <group> <name> <state>
move_external <axis> <value>, correct <RKorr/AKorr> <axis> <value>
speed <TechParam> <value>
log start|stop|status
graph show <csv> | graph compare <csv1> <csv2>
plot <type> <csv> [overlay]
move_cartesian, move_joint, queue_cartesian, queue_joint
execute_queue, clear_queue, show_queue
export_movement_data <file>
compare_test_runs <file1> <file2>
generate_report <file> <format>
safety-stop, safety-reset, safety-status, safety-set-limit
krlparse <src> <dat> <output>
inject_rsi <input> [output] [rsi_config]
visualize <csv> [export]
help
""")
if __name__ == "__main__": if __name__ == "__main__":
config_file = "RSI_EthernetConfig.xml" cli = RSICommandLineInterface("RSI_EthernetConfig.xml")
cli = RSICommandLineInterface(config_file)
cli.run() cli.run()

View File

@ -4,15 +4,14 @@ import time
from .config_parser import ConfigParser from .config_parser import ConfigParser
from .network_handler import NetworkProcess from .network_handler import NetworkProcess
from .safety_manager import SafetyManager from .safety_manager import SafetyManager
import threading
class RSIClient: class RSIClient:
"""Main RSI API class that integrates network, config handling, and message processing.""" """Main RSI API class that integrates network, config handling, and message processing."""
def __init__(self, config_file, rsi_limits_file=None): def __init__(self, config_file, rsi_limits_file=None):
"""Initialize the RSI client and set up logging and networking.""" logging.info(f"Loading RSI configuration from {config_file}...")
logging.info(f"📂 Loading RSI configuration from {config_file}...")
# Load configuration
self.config_parser = ConfigParser(config_file, rsi_limits_file) self.config_parser = ConfigParser(config_file, rsi_limits_file)
network_settings = self.config_parser.get_network_settings() network_settings = self.config_parser.get_network_settings()
@ -20,95 +19,73 @@ class RSIClient:
self.send_variables = self.manager.dict(self.config_parser.send_variables) self.send_variables = self.manager.dict(self.config_parser.send_variables)
self.receive_variables = self.manager.dict(self.config_parser.receive_variables) self.receive_variables = self.manager.dict(self.config_parser.receive_variables)
self.stop_event = multiprocessing.Event() self.stop_event = multiprocessing.Event()
self.start_event = multiprocessing.Event() # ✅ NEW
# ✅ Initialise safety manager from limits
self.safety_manager = SafetyManager(self.config_parser.safety_limits) self.safety_manager = SafetyManager(self.config_parser.safety_limits)
logging.info(f"🚀 Starting network process on {network_settings['ip']}:{network_settings['port']}...") # ✅ Create NetworkProcess but don't start communication yet
# ✅ Corrected constructor call with all necessary parameters
self.network_process = NetworkProcess( self.network_process = NetworkProcess(
network_settings["ip"], network_settings["ip"],
network_settings["port"], network_settings["port"],
self.send_variables, self.send_variables,
self.receive_variables, self.receive_variables,
self.stop_event, self.stop_event,
self.config_parser self.config_parser,
self.start_event
) )
self.network_process.start() self.network_process.start()
self.logger = None # Placeholder for logging module self.logger = None
def start(self): def start(self):
"""Keep the client running and allow periodic debugging.""" """Send start signal to NetworkProcess and run control loop."""
logging.info("✅ RSI Client Started") logging.info("RSIClient sending start signal to NetworkProcess...")
print("✅ RSI Client Started. Press CTRL+C to stop.") self.start_event.set()
self.running = True
logging.info("RSI Client Started")
try: try:
while not self.stop_event.is_set(): while self.running and not self.stop_event.is_set():
time.sleep(2) time.sleep(2)
except KeyboardInterrupt: except KeyboardInterrupt:
self.stop() self.stop()
except Exception as e: except Exception as e:
logging.error(f"❌ RSI Client encountered an error: {e}") logging.error(f"RSI Client encountered an error: {e}")
print(f"❌ RSI Client encountered an error: {e}")
def stop(self): def stop(self):
"""Stop the network process safely and close resources.""" """Stop the network process and the client thread safely."""
logging.info("🛑 Stopping RSI Client...") logging.info("🛑 Stopping RSI Client...")
print("🛑 Stopping RSI Client...")
self.stop_event.set() # ✅ Signal all processes to stop self.running = False
self.stop_event.set() # ✅ Tell network process to exit nicely
if self.network_process.is_alive(): if self.network_process and self.network_process.is_alive():
self.network_process.terminate() self.network_process.join(timeout=3) # ✅ Give it time to shutdown
self.network_process.join() if self.network_process.is_alive():
logging.warning("⚠️ Forcing network process termination...")
self.network_process.terminate()
self.network_process.join()
if hasattr(self, "thread") and self.thread and self.thread.is_alive():
self.thread.join()
self.thread = None
logging.info("✅ RSI Client Stopped") logging.info("✅ RSI Client Stopped")
print("✅ RSI Client Stopped")
def update_send_variable(self, name, value):
print(f"[DEBUG] update_send_variable called with: {name} = {value}")
if "." in name:
parent, child = name.split(".", 1)
if parent in self.send_variables:
current = dict(self.send_variables[parent]) # copy inner dict
current[child] = float(value)
self.send_variables[parent] = current # reassign to trigger proxy update
return f"✅ Updated {name} to {value}"
else:
return f"❌ Parent variable '{parent}' not found in send_variables"
else:
self.send_variables[name] = float(value)
return f"✅ Updated {name} to {value}"
def start_logging(self, filename):
if hasattr(self.network_process, "start_logging"):
self.network_process.start_logging(filename)
def stop_logging(self):
if hasattr(self.network_process, "stop_logging"):
self.network_process.stop_logging()
def is_logging_active(self):
return self.network_process.is_logging_active()
def enable_alerts(self, enable):
if hasattr(self.network_process, "enable_alerts"):
self.network_process.enable_alerts(enable)
def set_alert_threshold(self, alert_type, threshold):
if hasattr(self.network_process, "set_alert_threshold"):
self.network_process.set_alert_threshold(alert_type, threshold)
def reset_send_variables(self):
self.send_variables.update(self.config_parser.send_variables.copy())
def reconnect(self): def reconnect(self):
if self.network_process.is_alive(): """Reconnects the network process safely."""
logging.info("Reconnecting RSI Client network...")
if self.network_process and self.network_process.is_alive():
self.stop_event.set()
self.network_process.terminate() self.network_process.terminate()
self.network_process.join() self.network_process.join()
# Fresh new events
self.stop_event = multiprocessing.Event()
self.start_event = multiprocessing.Event()
# Create new network process
network_settings = self.config_parser.get_network_settings() network_settings = self.config_parser.get_network_settings()
self.network_process = NetworkProcess( self.network_process = NetworkProcess(
network_settings["ip"], network_settings["ip"],
@ -116,24 +93,11 @@ class RSIClient:
self.send_variables, self.send_variables,
self.receive_variables, self.receive_variables,
self.stop_event, self.stop_event,
self.config_parser self.config_parser,
self.start_event
) )
self.network_process.start() self.network_process.start()
# Fresh control thread
def get_movement_data(self): self.thread = threading.Thread(target=self.start, daemon=True)
"""Returns a list of all logged data entries.""" self.thread.start()
if hasattr(self, "logger") and self.logger:
return self.logger.get_all_records()
return []
if __name__ == "__main__":
config_file = "RSI_EthernetConfig.xml"
client = RSIClient(config_file)
try:
client.start()
except KeyboardInterrupt:
client.stop()

View File

@ -3,7 +3,7 @@ import logging
from src.RSIPI.rsi_limit_parser import parse_rsi_limits from src.RSIPI.rsi_limit_parser import parse_rsi_limits
# ✅ Configure Logging (toggleable) # ✅ Configure Logging (toggleable)
LOGGING_ENABLED = True # Change to False to silence logging output LOGGING_ENABLED = True # Change too False to silence logging output
if LOGGING_ENABLED: if LOGGING_ENABLED:
logging.basicConfig( logging.basicConfig(
@ -67,9 +67,9 @@ class RSIConfig:
if self.rsi_limits_file: if self.rsi_limits_file:
try: try:
self.safety_limits = parse_rsi_limits(self.rsi_limits_file) self.safety_limits = parse_rsi_limits(self.rsi_limits_file)
logging.info(f"Loaded safety limits from {self.rsi_limits_file}") logging.info(f"Loaded safety limits from {self.rsi_limits_file}")
except Exception as e: except Exception as e:
logging.warning(f"⚠️ Failed to load RSI safety limits: {e}") logging.warning(f"Failed to load RSI safety limits: {e}")
self.safety_limits = {} self.safety_limits = {}
@staticmethod @staticmethod

View File

@ -60,8 +60,8 @@ class EchoServer:
self.running = True self.running = True
self.thread = threading.Thread(target=self.send_message, daemon=True) self.thread = threading.Thread(target=self.send_message, daemon=True)
logging.info(f"Echo Server started on {self.server_address}") logging.info(f"Echo Server started on {self.server_address}")
print(f"Echo Server started in {self.mode.upper()} mode.") print(f"Echo Server started in {self.mode.upper()} mode.")
def receive_and_process(self): def receive_and_process(self):
""" """
@ -98,7 +98,7 @@ class EchoServer:
elif tag == "IPOC": elif tag == "IPOC":
self.ipoc_value = int(elem.text.strip()) self.ipoc_value = int(elem.text.strip())
logging.debug(f"Processed input: {ET.tostring(root).decode()}") logging.debug(f"Processed input: {ET.tostring(root).decode()}")
except socket.timeout: except socket.timeout:
pass # No data within delay window pass # No data within delay window
except ConnectionResetError: except ConnectionResetError:
@ -148,7 +148,7 @@ class EchoServer:
def stop(self): def stop(self):
"""Stops the echo server and cleans up the socket.""" """Stops the echo server and cleans up the socket."""
print("🛑 Stopping Echo Server...") print("Stopping Echo Server...")
self.running = False self.running = False
self.thread.join() self.thread.join()
self.udp_socket.close() self.udp_socket.close()

View File

@ -118,9 +118,9 @@ class RSIGraphing:
"""Switch graphing mode at runtime (position, velocity, acceleration, force).""" """Switch graphing mode at runtime (position, velocity, acceleration, force)."""
if mode in ["position", "velocity", "acceleration", "force"]: if mode in ["position", "velocity", "acceleration", "force"]:
self.mode = mode self.mode = mode
print(f"Graphing mode changed to: {mode}") print(f"Graphing mode changed to: {mode}")
else: else:
print("Invalid mode. Available: position, velocity, acceleration, force") print("Invalid mode. Available: position, velocity, acceleration, force")
def set_alert_threshold(self, alert_type, threshold): def set_alert_threshold(self, alert_type, threshold):
"""Update threshold values for alerts.""" """Update threshold values for alerts."""
@ -128,12 +128,12 @@ class RSIGraphing:
self.deviation_threshold = threshold self.deviation_threshold = threshold
elif alert_type == "force": elif alert_type == "force":
self.force_threshold = threshold self.force_threshold = threshold
print(f"{alert_type.capitalize()} alert threshold set to {threshold}") print(f"{alert_type.capitalize()} alert threshold set to {threshold}")
def enable_alerts(self, enable): def enable_alerts(self, enable):
"""Enable or disable real-time alerts.""" """Enable or disable real-time alerts."""
self.alerts_enabled = enable self.alerts_enabled = enable
print(f"Alerts {'enabled' if enable else 'disabled'}.") print(f"Alerts {'enabled' if enable else 'disabled'}.")
def stop(self): def stop(self):
"""Gracefully stop plotting by closing the figure.""" """Gracefully stop plotting by closing the figure."""