Fixed starting and stopping rsi in the cli. Tidied up print statements, converted to logging. Tidied up comments.

This commit is contained in:
Adam 2025-04-26 21:02:28 +01:00
parent 093adb35cb
commit c461233728
12 changed files with 445 additions and 626 deletions

View File

@ -1,3 +1,4 @@
import logging
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
class ConfigParser: class ConfigParser:
@ -70,9 +71,6 @@ class ConfigParser:
print(f"[WARNING] Failed to load .rsi.xml safety limits: {e}") print(f"[WARNING] Failed to load .rsi.xml safety limits: {e}")
self.safety_limits = {} self.safety_limits = {}
print(f"✅ Final Send Variables: {self.send_variables}")
print(f"✅ Final Receive Variables: {self.receive_variables}")
def process_config(self): def process_config(self):
""" """
Parses the RSI config file and builds the send/receive variable dictionaries. Parses the RSI config file and builds the send/receive variable dictionaries.
@ -90,7 +88,7 @@ class ConfigParser:
# Extract <CONFIG> section for IP/port/etc. # Extract <CONFIG> section for IP/port/etc.
config = root.find("CONFIG") config = root.find("CONFIG")
if config is None: if config is None:
raise ValueError("Missing <CONFIG> section in RSI_EthernetConfig.xml") raise ValueError("Missing <CONFIG> section in RSI_EthernetConfig.xml")
self.network_settings = { self.network_settings = {
"ip": config.find("IP_NUMBER").text.strip() if config.find("IP_NUMBER") is not None else None, "ip": config.find("IP_NUMBER").text.strip() if config.find("IP_NUMBER") is not None else None,
@ -102,7 +100,7 @@ class ConfigParser:
print(f"✅ Loaded network settings: {self.network_settings}") print(f"✅ Loaded network settings: {self.network_settings}")
if None in self.network_settings.values(): if None in self.network_settings.values():
raise ValueError("Missing one or more required network settings (ip, port, sentype, onlysend)") raise ValueError("Missing one or more required network settings (ip, port, sentype, onlysend)")
# Parse SEND section # Parse SEND section
send_section = root.find("SEND/ELEMENTS") send_section = root.find("SEND/ELEMENTS")
@ -123,7 +121,7 @@ class ConfigParser:
return send_vars, receive_vars return send_vars, receive_vars
except Exception as e: except Exception as e:
print(f"Error processing config file: {e}") logging.error(f"Error processing config file: {e}")
return {}, {} return {}, {}
def process_variable_structure(self, var_dict, tag, var_type, indx=""): def process_variable_structure(self, var_dict, tag, var_type, indx=""):
@ -138,24 +136,19 @@ class ConfigParser:
""" """
tag = tag.replace("DEF_", "") # Remove DEF_ prefix if present tag = tag.replace("DEF_", "") # Remove DEF_ prefix if present
print(f"🔍 Assigning {tag}: INDX={indx}, TYPE={var_type}")
if tag in self.internal_structure: if tag in self.internal_structure:
# If pre-defined internally, copy structure # If pre-defined internally, copy structure
internal_value = self.internal_structure[tag] internal_value = self.internal_structure[tag]
var_dict[tag] = internal_value.copy() if isinstance(internal_value, dict) else internal_value var_dict[tag] = internal_value.copy() if isinstance(internal_value, dict) else internal_value
print(f"✅ INTERNAL Match: {tag} -> {var_dict[tag]}")
elif "." in tag: elif "." in tag:
# Handle nested dictionary e.g. Tech.T21 -> { 'Tech': { 'T21': 0.0 } } # Handle nested dictionary e.g. Tech.T21 -> { 'Tech': { 'T21': 0.0 } }
parent, subkey = tag.split(".", 1) parent, subkey = tag.split(".", 1)
if parent not in var_dict: if parent not in var_dict:
var_dict[parent] = {} var_dict[parent] = {}
var_dict[parent][subkey] = self.get_default_value(var_type) var_dict[parent][subkey] = self.get_default_value(var_type)
print(f"📂 Assigned '{tag}' as nested dictionary under '{parent}': {var_dict[parent]}")
else: else:
# Standard single-value variable # Standard single-value variable
var_dict[tag] = self.get_default_value(var_type) var_dict[tag] = self.get_default_value(var_type)
print(f"📄 Assigned Standard Value: '{tag}' -> {var_dict[tag]}")
@staticmethod @staticmethod
def rename_tech_keys(var_dict): def rename_tech_keys(var_dict):
@ -171,7 +164,6 @@ class ConfigParser:
tech_data.update(var_dict.pop(key)) tech_data.update(var_dict.pop(key))
if tech_data: if tech_data:
var_dict["Tech"] = tech_data var_dict["Tech"] = tech_data
print(f"✅ Renamed Tech.XX keys to 'Tech': {var_dict['Tech']}")
@staticmethod @staticmethod
def get_default_value(var_type): def get_default_value(var_type):

View File

@ -1,4 +1,5 @@
import csv import csv
import logging
import re import re
from collections import OrderedDict from collections import OrderedDict
@ -28,8 +29,6 @@ class KRLParser:
if label not in self.labels_to_extract: if label not in self.labels_to_extract:
self.labels_to_extract.append(label) self.labels_to_extract.append(label)
print(f"📌 Found labels in .src: {self.labels_to_extract}")
def parse_dat(self): def parse_dat(self):
""" """
Parses the .dat file and retrieves Cartesian coordinates for each label. Parses the .dat file and retrieves Cartesian coordinates for each label.
@ -58,8 +57,6 @@ class KRLParser:
self.positions[label] = coords self.positions[label] = coords
print(f"📥 Parsed {len(self.positions)} positions from .dat")
def export_csv(self, output_file): def export_csv(self, output_file):
""" """
Writes the extracted Cartesian positions into a structured CSV file, Writes the extracted Cartesian positions into a structured CSV file,
@ -90,9 +87,9 @@ class KRLParser:
}) })
sequence_number += 1 sequence_number += 1
else: else:
print(f"⚠️ Skipped missing/deleted point: {label}") logging.warning(f"Skipped missing/deleted point: {label}")
print(f"CSV exported successfully to {output_file} with {sequence_number} points.") logging.info(f"CSV exported successfully to {output_file} with {sequence_number} points.")
# Optional CLI usage # Optional CLI usage

View File

@ -152,9 +152,9 @@ if __name__ == "__main__":
if args.limits: if args.limits:
from src.RSIPI.rsi_limit_parser import parse_rsi_limits from src.RSIPI.rsi_limit_parser import parse_rsi_limits
limits = parse_rsi_limits(args.limits) limits = parse_rsi_limits(args.limits)
visualiser = Kukarsivisualiser(args.csv_file, safety_limits=limits) visualiser = KukaRSIVisualiser(args.csv_file, safety_limits=limits)
else: else:
visualiser = Kukarsivisualiser(args.csv_file) visualiser = KukaRSIVisualiser(args.csv_file)
visualiser.plot_trajectory() visualiser.plot_trajectory()
visualiser.plot_joint_positions() visualiser.plot_joint_positions()

130
src/RSIPI/live_plotter.py Normal file
View File

@ -0,0 +1,130 @@
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from collections import deque
from threading import Thread, Lock
import time
class LivePlotter:
def __init__(self, client, mode="3d", interval=100):
self.client = client
self.mode = mode
self.interval = interval
self.running = False
# Plot data buffers
self.time_data = deque(maxlen=500)
self.position_data = {k: deque(maxlen=500) for k in ["X", "Y", "Z"]}
self.velocity_data = {k: deque(maxlen=500) for k in ["X", "Y", "Z"]}
self.acceleration_data = {k: deque(maxlen=500) for k in ["X", "Y", "Z"]}
self.joint_data = {f"A{i}": deque(maxlen=500) for i in range(1, 7)}
self.force_data = {f"A{i}": deque(maxlen=500) for i in range(1, 7)}
self.previous_positions = {"X": 0, "Y": 0, "Z": 0}
self.previous_velocities = {"X": 0, "Y": 0, "Z": 0}
self.previous_time = time.time()
self.lock = Lock()
self.collector_thread = None
self.fig = plt.figure()
self.ax = self.fig.add_subplot(111, projection="3d" if self.mode == "3d" else None)
def start(self):
self.running = True
self.collector_thread = Thread(target=self.collect_data_loop, daemon=True)
self.collector_thread.start()
self.ani = animation.FuncAnimation(self.fig, self.update_plot, interval=self.interval)
try:
plt.show()
except RuntimeError:
print("⚠️ Matplotlib GUI interrupted during shutdown.")
self.running = False
def stop(self, save_path: str = None):
self.running = False
if save_path:
try:
self.fig.savefig(save_path, bbox_inches="tight")
print(f"📸 Plot saved to '{save_path}'")
except Exception as e:
print(f"❌ Failed to save plot: {e}")
plt.close(self.fig)
def collect_data_loop(self):
while self.running:
with self.lock:
current_time = time.time()
dt = current_time - self.previous_time
self.previous_time = current_time
self.time_data.append(current_time)
position = self.client.receive_variables.get("RIst", {"X": 0, "Y": 0, "Z": 0})
joints = self.client.receive_variables.get("AIPos", {f"A{i}": 0 for i in range(1, 7)})
force = self.client.receive_variables.get("MaCur", {f"A{i}": 0 for i in range(1, 7)})
for axis in ["X", "Y", "Z"]:
vel = (position[axis] - self.previous_positions[axis]) / dt if dt > 0 else 0
acc = (vel - self.previous_velocities[axis]) / dt if dt > 0 else 0
self.previous_positions[axis] = position[axis]
self.previous_velocities[axis] = vel
self.position_data[axis].append(position[axis])
self.velocity_data[axis].append(vel)
self.acceleration_data[axis].append(acc)
for i in range(1, 7):
self.joint_data[f"A{i}"].append(joints.get(f"A{i}", 0))
self.force_data[f"A{i}"].append(force.get(f"A{i}", 0))
time.sleep(self.interval / 1000.0)
def update_plot(self, frame):
if not self.running:
return
with self.lock:
self.ax.clear()
self.render_plot()
def render_plot(self):
if self.mode == "3d":
self.ax.set_title("Live 3D TCP Trajectory")
self.ax.plot(self.position_data["X"], self.position_data["Y"], self.position_data["Z"], label="TCP Path")
self.ax.set_xlabel("X")
self.ax.set_ylabel("Y")
self.ax.set_zlabel("Z")
elif self.mode == "2d_xy":
self.ax.set_title("Live 2D Trajectory (X-Y)")
self.ax.plot(self.position_data["X"], self.position_data["Y"], label="XY Path")
self.ax.set_xlabel("X")
self.ax.set_ylabel("Y")
elif self.mode == "velocity":
self.ax.set_title("Live TCP Velocity")
self.ax.plot(self.time_data, self.velocity_data["X"], label="dX/dt")
self.ax.plot(self.time_data, self.velocity_data["Y"], label="dY/dt")
self.ax.plot(self.time_data, self.velocity_data["Z"], label="dZ/dt")
self.ax.set_ylabel("Velocity [mm/s]")
elif self.mode == "acceleration":
self.ax.set_title("Live TCP Acceleration")
self.ax.plot(self.time_data, self.acceleration_data["X"], label="d²X/dt²")
self.ax.plot(self.time_data, self.acceleration_data["Y"], label="d²Y/dt²")
self.ax.plot(self.time_data, self.acceleration_data["Z"], label="d²Z/dt²")
self.ax.set_ylabel("Acceleration [mm/s²]")
elif self.mode == "joints":
self.ax.set_title("Live Joint Angles")
for j, values in self.joint_data.items():
self.ax.plot(self.time_data, values, label=j)
self.ax.set_ylabel("Angle [deg]")
elif self.mode == "force":
self.ax.set_title("Live Motor Currents")
for j, values in self.force_data.items():
self.ax.plot(self.time_data, values, label=j)
self.ax.set_ylabel("Current [Nm]")
self.ax.set_xlabel("Time")
self.ax.legend()
self.ax.grid(True)
self.fig.tight_layout()
def change_mode(self, mode):
self.mode = mode
self.ax = self.fig.add_subplot(111, projection="3d" if mode == "3d" else None)

View File

@ -1,16 +1,19 @@
from src.RSIPI.rsi_api import RSIAPI from src.RSIPI.rsi_api import RSIAPI
from src.RSIPI.live_plotter import LivePlotter
import math
import time import time
import threading
def main(): def main():
# Step 1: Create API instance
api = RSIAPI("RSI_EthernetConfig.xml") api = RSIAPI("RSI_EthernetConfig.xml")
time.sleep(10)
# Step 2: Start RSI connection
print("🔌 Starting RSI client...")
api.start_rsi()
time.sleep(10)
# Step 10: Stop RSI connection
print("🛑 Stopping RSI client...")
api.stop_rsi()
print("✅ All safety methods tested successfully.")
print(api.compare_test_runs("25-04-2025_16-33-47.csv", "25-04-2025_20-57-59.csv"))
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -1,81 +1,73 @@
import multiprocessing import multiprocessing
import socket import socket
import time
import csv
import logging import logging
import xml.etree.ElementTree as ET # ✅ FIX: Import ElementTree import xml.etree.ElementTree as ET
from .xml_handler import XMLGenerator from .xml_handler import XMLGenerator
from .safety_manager import SafetyManager from .safety_manager import SafetyManager
class NetworkProcess(multiprocessing.Process): class NetworkProcess(multiprocessing.Process):
"""Handles UDP communication and optional CSV logging in a separate process.""" """Handles UDP communication and optional CSV logging in a separate process."""
def __init__(self, ip, port, send_variables, receive_variables, stop_event, config_parser): def __init__(self, ip, port, send_variables, receive_variables, stop_event, config_parser, start_event):
super().__init__() super().__init__()
self.send_variables = send_variables self.send_variables = send_variables
self.receive_variables = receive_variables self.receive_variables = receive_variables
self.stop_event = stop_event self.stop_event = stop_event
self.start_event = start_event # ✅ NEW
self.config_parser = config_parser self.config_parser = config_parser
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.safety_manager = SafetyManager(config_parser.safety_limits) self.safety_manager = SafetyManager(config_parser.safety_limits)
self.client_address = (ip, port) self.client_address = (ip, port)
self.logging_active = multiprocessing.Value('b', False)
if not self.is_valid_ip(ip): self.log_filename = multiprocessing.Array('c', 256)
logging.warning(f"Invalid IP address '{ip}' detected. Falling back to '0.0.0.0'.")
print(f"⚠️ Invalid IP '{ip}', falling back to '0.0.0.0'.")
self.client_address = ('0.0.0.0', port)
else:
self.client_address = (ip, port)
self.logging_active = multiprocessing.Value('b', False) # Shared flag for logging
self.log_filename = multiprocessing.Array('c', 256) # Shared memory for filename
self.csv_process = None self.csv_process = None
self.controller_ip_and_port = None
def run(self):
"""Start the network loop."""
self.start_event.wait() # ✅ Wait until RSIClient sends start signal
try: try:
if not self.is_valid_ip(self.client_address[0]):
logging.warning(f"Invalid IP address '{self.client_address[0]}'. Falling back to '0.0.0.0'.")
self.client_address = ('0.0.0.0', self.client_address[1])
self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.udp_socket.bind(self.client_address) self.udp_socket.bind(self.client_address)
logging.info(f"✅ Network process initialized on {self.client_address}") logging.info(f"✅ Network process bound on {self.client_address}")
except OSError as e: except OSError as e:
logging.error(f"❌ Failed to bind to {self.client_address}: {e}") logging.error(f"❌ Failed to bind to {self.client_address}: {e}")
raise raise
self.controller_ip_and_port = None
@staticmethod
def is_valid_ip(ip):
"""Check if an IP address is valid and can be bound on this machine."""
try:
socket.inet_aton(ip) # Validate format
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.bind((ip, 0)) # Try binding
return True
except (socket.error, OSError):
return False
def run(self):
"""Start the network loop."""
print("[DEBUG] Network process started.")
while not self.stop_event.is_set(): while not self.stop_event.is_set():
try: try:
self.udp_socket.settimeout(5) self.udp_socket.settimeout(5)
data_received, self.controller_ip_and_port = self.udp_socket.recvfrom(1024) data_received, self.controller_ip_and_port = self.udp_socket.recvfrom(1024)
#print("Receive: ", data_received)
message = data_received.decode() message = data_received.decode()
self.process_received_data(message) self.process_received_data(message)
#print("Network :", self.send_variables)
send_xml = XMLGenerator.generate_send_xml(self.send_variables, self.config_parser.network_settings) send_xml = XMLGenerator.generate_send_xml(self.send_variables, self.config_parser.network_settings)
#print("Send:", send_xml)
self.udp_socket.sendto(send_xml.encode(), self.controller_ip_and_port) self.udp_socket.sendto(send_xml.encode(), self.controller_ip_and_port)
# ✅ If logging is active, write data to CSV
if self.logging_active.value: if self.logging_active.value:
self.log_to_csv() self.log_to_csv()
except socket.timeout: except socket.timeout:
print("[WARNING] No message received within timeout period.") logging.error("[WARNING] No message received within timeout period.")
except Exception as e: except Exception as e:
print(f"[ERROR] Network process error: {e}") logging.error(f"[ERROR] Network process error: {e}")
@staticmethod
def is_valid_ip(ip):
try:
socket.inet_aton(ip)
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.bind((ip, 0))
return True
except (socket.error, OSError):
return False
def process_received_data(self, xml_string): def process_received_data(self, xml_string):
try: try:
@ -86,87 +78,9 @@ class NetworkProcess(multiprocessing.Process):
self.receive_variables[element.tag] = {k: float(v) for k, v in element.attrib.items()} self.receive_variables[element.tag] = {k: float(v) for k, v in element.attrib.items()}
else: else:
self.receive_variables[element.tag] = element.text self.receive_variables[element.tag] = element.text
# specifically capture IPOC from received message
if element.tag == "IPOC": if element.tag == "IPOC":
received_ipoc = int(element.text) received_ipoc = int(element.text)
self.receive_variables["IPOC"] = received_ipoc self.receive_variables["IPOC"] = received_ipoc
self.send_variables["IPOC"] = received_ipoc + 4 # Increment by 4 ms self.send_variables["IPOC"] = received_ipoc + 4
except Exception as e: except Exception as e:
print(f"[ERROR] Error parsing received message: {e}") logging.error(f"[ERROR] Error parsing received message: {e}")
def log_to_csv(self):
"""Write send/receive variables to the CSV log with safety flags."""
filename = self.log_filename.value.decode().strip()
if not filename:
return
try:
with open(filename, mode="a", newline="") as file:
writer = csv.writer(file)
# Write header if new
if file.tell() == 0:
headers = ["Timestamp", "IPOC"]
for k, v in self.send_variables.items():
if isinstance(v, dict):
headers += [f"Send.{k}.{subk}" for subk in v.keys()]
else:
headers.append(f"Send.{k}")
for k, v in self.receive_variables.items():
if isinstance(v, dict):
headers += [f"Receive.{k}.{subk}" for subk in v.keys()]
else:
headers.append(f"Receive.{k}")
headers.append("SafetyViolation")
writer.writerow(headers)
# Gather values safely
timestamp = time.strftime("%d-%m-%Y %H:%M:%S")
ipoc = self.receive_variables.get("IPOC", 0)
send_data = []
for k, v in self.send_variables.items():
if isinstance(v, dict):
send_data.extend([v.get(subk, "") for subk in v])
else:
send_data.append(v)
receive_data = []
for k, v in self.receive_variables.items():
if isinstance(v, dict):
receive_data.extend([v.get(subk, "") for subk in v])
else:
receive_data.append(v)
# Safety check
violation = "False"
try:
for var, val in self.send_variables.items():
if isinstance(val, dict):
for sub, subval in val.items():
path = f"{var}.{sub}"
self.safety_manager.validate(path, subval)
else:
self.safety_manager.validate(var, val)
except Exception as e:
violation = str(e)
writer.writerow([timestamp, ipoc] + send_data + receive_data + [violation])
except Exception as e:
print(f"[ERROR] Failed to log data to CSV: {e}")
def start_logging(self, filename):
"""Start logging RSI data to CSV."""
self.logging_active.value = True
self.log_filename.value = filename.encode()
print(f"✅ CSV Logging started: {filename}")
def stop_logging(self):
"""Stop logging RSI data."""
self.logging_active.value = False
print("🛑 CSV Logging stopped.")
def is_logging_active(self):
"""Return logging status."""
return self.logging_active.value

View File

@ -1,8 +1,9 @@
import logging
import pandas as pd import pandas as pd
import numpy as np import numpy as np
import json import json
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from .rsi_client import RSIClient
from .kuka_visualiser import KukaRSIVisualiser from .kuka_visualiser import KukaRSIVisualiser
from .krl_to_csv_parser import KRLParser from .krl_to_csv_parser import KRLParser
from .inject_rsi_to_krl import inject_rsi_to_krl from .inject_rsi_to_krl import inject_rsi_to_krl
@ -22,14 +23,16 @@ class RSIAPI:
"""Initialize RSIAPI with an RSI client instance.""" """Initialize RSIAPI with an RSI client instance."""
self.thread = None self.thread = None
self.config_file = config_file self.config_file = config_file
self.client = None # Delay instantiation self.client = None
self.graph_process = None # Store graphing process self.graph_process = None
self.graphing_instance = None self.graphing_instance = None
self.graph_thread = None# self.graph_thread = None#
self.trajectory_queue = [] self.trajectory_queue = []
self.live_plotter = None self.live_plotter = None
self.live_plot_thread = None self.live_plot_thread = None
self._ensure_client()
def _ensure_client(self): def _ensure_client(self):
"""Ensure RSIClient is initialised before use.""" """Ensure RSIClient is initialised before use."""
if self.client is None: if self.client is None:
@ -37,15 +40,15 @@ class RSIAPI:
self.client = RSIClient(self.config_file) self.client = RSIClient(self.config_file)
def start_rsi(self): def start_rsi(self):
self._ensure_client()
self.thread = threading.Thread(target=self.client.start, daemon=True) self.thread = threading.Thread(target=self.client.start, daemon=True)
self.thread.start() self.thread.start()
return "RSI started in background." return "RSI started in background."
def stop_rsi(self): def stop_rsi(self):
"""Stop the RSI client.""" """Stop the RSI client."""
self.client.stop() self.client.stop()
return "RSI stopped." return "RSI stopped."
def generate_report(filename, format_type): def generate_report(filename, format_type):
""" """
@ -89,9 +92,9 @@ class RSIAPI:
plt.tight_layout() plt.tight_layout()
plt.savefig(output_path) plt.savefig(output_path)
else: else:
raise ValueError(f"Unsupported format: {format_type}") raise ValueError(f"Unsupported format: {format_type}")
return f"Report saved as {output_path}" return f"Report saved as {output_path}"
def update_variable(self, name, value): def update_variable(self, name, value):
if "." in name: if "." in name:
@ -103,13 +106,13 @@ class RSIAPI:
safe_value = self.client.safety_manager.validate(full_path, float(value)) safe_value = self.client.safety_manager.validate(full_path, float(value))
current[child] = safe_value current[child] = safe_value
self.client.send_variables[parent] = current self.client.send_variables[parent] = current
return f"Updated {name} to {safe_value}" return f"Updated {name} to {safe_value}"
else: else:
raise KeyError(f"Parent variable '{parent}' not found in send_variables") raise KeyError(f"Parent variable '{parent}' not found in send_variables")
else: else:
safe_value = self.client.safety_manager.validate(name, float(value)) safe_value = self.client.safety_manager.validate(name, float(value))
self.client.send_variables[name] = safe_value self.client.send_variables[name] = safe_value
return f"Updated {name} to {safe_value}" return f"Updated {name} to {safe_value}"
def show_variables(self): def show_variables(self):
"""Print available variable names in send and receive variables.""" """Print available variable names in send and receive variables."""
@ -169,7 +172,7 @@ class RSIAPI:
def reconnect(self): def reconnect(self):
"""Restart the network connection without stopping RSI.""" """Restart the network connection without stopping RSI."""
self.client.reconnect() self.client.reconnect()
return "Network connection restarted." return "Network connection restarted."
def toggle_digital_io(self, io_group, io_name, state): def toggle_digital_io(self, io_group, io_name, state):
""" """
@ -246,7 +249,7 @@ class RSIAPI:
str: Status message indicating plot success or failure. str: Status message indicating plot success or failure.
""" """
if not os.path.exists(csv_path): if not os.path.exists(csv_path):
return f"CSV file not found: {csv_path}" return f"CSV file not found: {csv_path}"
try: try:
plot_type = plot_type.lower() plot_type = plot_type.lower()
@ -272,20 +275,20 @@ class RSIAPI:
StaticPlotter.plot_motor_currents(csv_path) StaticPlotter.plot_motor_currents(csv_path)
case "deviation": case "deviation":
if overlay_path is None or not os.path.exists(overlay_path): if overlay_path is None or not os.path.exists(overlay_path):
return "Deviation plot requires a valid overlay CSV file." return "Deviation plot requires a valid overlay CSV file."
StaticPlotter.plot_deviation(csv_path, overlay_path) StaticPlotter.plot_deviation(csv_path, overlay_path)
case _: case _:
return f"Invalid plot type '{plot_type}'. Use one of: 3d, 2d_xy, 2d_xz, 2d_yz, position, velocity, acceleration, joints, force, deviation." return f"Invalid plot type '{plot_type}'. Use one of: 3d, 2d_xy, 2d_xz, 2d_yz, position, velocity, acceleration, joints, force, deviation."
return f"✅ Plot '{plot_type}' generated successfully." return f"✅ Plot '{plot_type}' generated successfully."
except Exception as e: except Exception as e:
return f"Failed to generate plot '{plot_type}': {str(e)}" return f"Failed to generate plot '{plot_type}': {str(e)}"
def start_live_plot(self, mode="3d", interval=100): def start_live_plot(self, mode="3d", interval=100):
if self.live_plotter and self.live_plotter.running: if self.live_plotter and self.live_plotter.running:
return "⚠️ Live plotting already active." return "Live plotting already active."
def runner(): def runner():
self.live_plotter = LivePlotter(self.client, mode=mode, interval=interval) self.live_plotter = LivePlotter(self.client, mode=mode, interval=interval)
@ -293,19 +296,19 @@ class RSIAPI:
self.live_plot_thread = Thread(target=runner, daemon=True) self.live_plot_thread = Thread(target=runner, daemon=True)
self.live_plot_thread.start() self.live_plot_thread.start()
return f"📈 Live plot started in '{mode}' mode at {interval}ms interval." return f"Live plot started in '{mode}' mode at {interval}ms interval."
def stop_live_plot(self): def stop_live_plot(self):
if self.live_plotter and self.live_plotter.running: if self.live_plotter and self.live_plotter.running:
self.live_plotter.stop() self.live_plotter.stop()
return "🛑 Live plotting stopped." return "Live plotting stopped."
return "⚠️ No live plot is currently running." return "No live plot is currently running."
def change_live_plot_mode(self, mode): def change_live_plot_mode(self, mode):
if self.live_plotter and self.live_plotter.running: if self.live_plotter and self.live_plotter.running:
self.live_plotter.change_mode(mode) self.live_plotter.change_mode(mode)
return f"🔄 Live plot mode changed to '{mode}'." return f"Live plot mode changed to '{mode}'."
return "⚠️ No live plot is active to change mode." return "No live plot is active to change mode."
@ -313,7 +316,7 @@ class RSIAPI:
def enable_alerts(self, enable): def enable_alerts(self, enable):
"""Enable or disable real-time alerts.""" """Enable or disable real-time alerts."""
self.client.enable_alerts(enable) self.client.enable_alerts(enable)
return f"Alerts {'enabled' if enable else 'disabled'}." return f"Alerts {'enabled' if enable else 'disabled'}."
def override_safety(self, enabled: bool): def override_safety(self, enabled: bool):
self.client.safety_manager.override_safety(enabled) self.client.safety_manager.override_safety(enabled)
@ -325,8 +328,8 @@ class RSIAPI:
"""Set threshold for deviation or force alerts.""" """Set threshold for deviation or force alerts."""
if alert_type in ["deviation", "force"]: if alert_type in ["deviation", "force"]:
self.client.set_alert_threshold(alert_type, value) self.client.set_alert_threshold(alert_type, value)
return f"{alert_type.capitalize()} alert threshold set to {value}" return f"{alert_type.capitalize()} alert threshold set to {value}"
return "Invalid alert type. Use 'deviation' or 'force'." return "Invalid alert type. Use 'deviation' or 'force'."
@staticmethod @staticmethod
def visualise_csv_log(csv_file, export=False): def visualise_csv_log(csv_file, export=False):
@ -360,9 +363,9 @@ class RSIAPI:
parser.parse_src() parser.parse_src()
parser.parse_dat() parser.parse_dat()
parser.export_csv(output_file) parser.export_csv(output_file)
return f"KRL data successfully exported to {output_file}" return f"KRL data successfully exported to {output_file}"
except Exception as e: except Exception as e:
return f"Error parsing KRL files: {e}" return f"Error parsing KRL files: {e}"
@staticmethod @staticmethod
def inject_rsi(input_krl, output_krl=None, rsi_config="RSIGatewayv1.rsi"): def inject_rsi(input_krl, output_krl=None, rsi_config="RSIGatewayv1.rsi"):
@ -377,9 +380,9 @@ class RSIAPI:
try: try:
inject_rsi_to_krl(input_krl, output_krl, rsi_config) inject_rsi_to_krl(input_krl, output_krl, rsi_config)
output_path = output_krl if output_krl else input_krl output_path = output_krl if output_krl else input_krl
return f"RSI successfully injected into {output_path}" return f"RSI successfully injected into {output_path}"
except Exception as e: except Exception as e:
return f"RSI injection failed: {e}" return f"RSI injection failed: {e}"
@staticmethod @staticmethod
def generate_trajectory(start, end, steps=100, space="cartesian", mode="absolute", include_resets=False): def generate_trajectory(start, end, steps=100, space="cartesian", mode="absolute", include_resets=False):
@ -403,7 +406,7 @@ class RSIAPI:
self.update_joints(**point) self.update_joints(**point)
else: else:
raise ValueError("space must be 'cartesian' or 'joint'") raise ValueError("space must be 'cartesian' or 'joint'")
print(f"🚀 Step {idx + 1}/{len(trajectory)} sent") print(f"Step {idx + 1}/{len(trajectory)} sent")
await asyncio.sleep(rate) await asyncio.sleep(rate)
try: try:
@ -453,7 +456,7 @@ class RSIAPI:
df = pd.DataFrame(data) df = pd.DataFrame(data)
df.to_csv(filename, index=False) df.to_csv(filename, index=False)
return f"Movement data exported to {filename}" return f"Movement data exported to {filename}"
@staticmethod @staticmethod
def compare_test_runs(file1, file2): def compare_test_runs(file1, file2):
@ -484,7 +487,7 @@ class RSIAPI:
""" """
self._ensure_client() self._ensure_client()
if "RKorr" not in self.client.send_variables: if "RKorr" not in self.client.send_variables:
print("⚠️ Warning: RKorr not configured in send_variables. Skipping Cartesian update.") logging.warning("Warning: RKorr not configured in send_variables. Skipping Cartesian update.")
return return
for axis, value in kwargs.items(): for axis, value in kwargs.items():
@ -496,7 +499,7 @@ class RSIAPI:
""" """
self._ensure_client() self._ensure_client()
if "AKorr" not in self.client.send_variables: if "AKorr" not in self.client.send_variables:
print("⚠️ Warning: AKorr not configured in send_variables. Skipping Joint update.") logging.warning("⚠️ Warning: AKorr not configured in send_variables. Skipping Joint update.")
return return
for axis, value in kwargs.items(): for axis, value in kwargs.items():
@ -510,7 +513,7 @@ class RSIAPI:
import time import time
import datetime import datetime
print("📡 Watching network... Press Ctrl+C to stop.\n") logging.info("Watching network... Press Ctrl+C to stop.\n")
start_time = time.time() start_time = time.time()
try: try:
@ -525,9 +528,7 @@ class RSIAPI:
break break
except KeyboardInterrupt: except KeyboardInterrupt:
print("\n🛑 Stopped network watch.") logging.info("\nStopped network watch.")
# --- 🤖 High-level Cartesian and Joint Trajectory Movement ---
def move_cartesian_trajectory(self, start_pose, end_pose, steps=50, rate=0.012): def move_cartesian_trajectory(self, start_pose, end_pose, steps=50, rate=0.012):
""" """
@ -558,11 +559,11 @@ class RSIAPI:
Generate and queue a Cartesian movement (no execution). Generate and queue a Cartesian movement (no execution).
""" """
if not isinstance(start_pose, dict) or not isinstance(end_pose, dict): if not isinstance(start_pose, dict) or not isinstance(end_pose, dict):
raise ValueError("start_pose and end_pose must be dictionaries (e.g., {'X': 0, 'Y': 0, 'Z': 500})") raise ValueError("start_pose and end_pose must be dictionaries (e.g., {'X': 0, 'Y': 0, 'Z': 500})")
if steps <= 0: if steps <= 0:
raise ValueError("Steps must be greater than zero.") raise ValueError("Steps must be greater than zero.")
if rate <= 0: if rate <= 0:
raise ValueError("Rate must be greater than zero.") raise ValueError("Rate must be greater than zero.")
trajectory = self.generate_trajectory(start_pose, end_pose, steps=steps, space="cartesian") trajectory = self.generate_trajectory(start_pose, end_pose, steps=steps, space="cartesian")
self.queue_trajectory(trajectory, "cartesian", rate) self.queue_trajectory(trajectory, "cartesian", rate)
@ -572,11 +573,11 @@ class RSIAPI:
Generate and queue a Joint-space movement (no execution). Generate and queue a Joint-space movement (no execution).
""" """
if not isinstance(start_joints, dict) or not isinstance(end_joints, dict): if not isinstance(start_joints, dict) or not isinstance(end_joints, dict):
raise ValueError("start_joints and end_joints must be dictionaries (e.g., {'A1': 0, 'A2': 0})") raise ValueError("start_joints and end_joints must be dictionaries (e.g., {'A1': 0, 'A2': 0})")
if steps <= 0: if steps <= 0:
raise ValueError("Steps must be greater than zero.") raise ValueError("Steps must be greater than zero.")
if rate <= 0: if rate <= 0:
raise ValueError("Rate must be greater than zero.") raise ValueError("Rate must be greater than zero.")
trajectory = self.generate_trajectory(start_joints, end_joints, steps=steps, space="joint") trajectory = self.generate_trajectory(start_joints, end_joints, steps=steps, space="joint")
self.queue_trajectory(trajectory, "joint", rate) self.queue_trajectory(trajectory, "joint", rate)

View File

@ -1,375 +1,196 @@
from .rsi_api import RSIAPI from RSIPI.rsi_api import RSIAPI
from .kuka_visualiser import KukaRSIVisualiser
from .krl_to_csv_parser import KRLParser
from .inject_rsi_to_krl import inject_rsi_to_krl
class RSICommandLineInterface: class RSICommandLineInterface:
"""Command-Line Interface for controlling RSI Client.""" """Command-Line Interface for controlling RSI Client."""
def __init__(self, input_config_file): def __init__(self, input_config_file):
"""Initialize CLI with an RSI API instance."""
self.client = RSIAPI(input_config_file) self.client = RSIAPI(input_config_file)
self.running = True self.running = True
def run(self): def run(self):
"""Starts the CLI interaction loop.""" print("RSI Command-Line Interface Started. Type 'help' for commands.")
print("✅ RSI Command-Line Interface Started. Type 'help' for commands.")
while self.running: while self.running:
command = input("RSI> ").strip().lower() try:
command = input("RSI> ").strip()
self.process_command(command) self.process_command(command)
except KeyboardInterrupt:
self.exit()
def process_command(self, command): def process_command(self, command):
"""Processes user input commands."""
parts = command.split() parts = command.split()
if not parts: if not parts:
return return
cmd = parts[0] cmd = parts[0].lower()
args = parts[1:]
if cmd == "start": try:
self.client.start_rsi() match cmd:
elif cmd == "stop": case "start":
self.client.stop_rsi() print(self.client.start_rsi())
elif cmd == "set" and len(parts) >= 3: case "stop":
variable, value = parts[1], " ".join(parts[2:]) print(self.client.stop_rsi())
self.client.update_variable(variable, value) case "exit":
elif cmd == "alerts" and len(parts) == 2: self.exit()
self.toggle_alerts(parts[1]) case "set":
elif cmd == "set_alert_threshold" and len(parts) == 3: var, val = args[0], args[1]
self.set_alert_threshold(parts[1], parts[2]) print(self.client.update_variable(var, val))
elif cmd == "show": case "show":
if len(parts) == 0: print("📤 Send Variables:")
group = "all" self.client.show_variables()
else: case "reset":
group = parts[0] print(self.client.reset_variables())
self.client.show_variables(group) case "status":
elif cmd == "ipoc": print(self.client.show_config_file())
ipoc = self.client.get_ipoc() case "ipoc":
print(f"🛰 Current IPOC: {ipoc}") print(f"🛰 IPOC: {self.client.get_ipoc()}")
elif cmd == "watch": case "watch":
duration = float(parts[0]) if parts else None duration = float(args[0]) if args else None
self.client.watch_network(duration=duration) self.client.watch_network(duration)
elif cmd == "reset": case "reconnect":
self.client.reset_variables() print(self.client.reconnect())
elif cmd == "status": case "alerts":
self.client.show_config_file() state = args[0].lower()
elif cmd == "reconnect": self.client.enable_alerts(state == "on")
self.client.reconnect() case "set_alert_threshold":
elif cmd == "toggle" and len(parts) == 3: alert_type, value = args[0], float(args[1])
self.client.toggle_digital_io(parts[1], parts[2]) self.client.set_alert_threshold(alert_type, value)
elif cmd == "move_external" and len(parts) == 3: case "toggle":
self.client.move_external_axis(parts[1], parts[2]) group, name, value = args
elif cmd == "correct" and len(parts) == 4: print(self.client.toggle_digital_io(group, name, value))
self.client.correct_position(parts[1], parts[2], parts[3]) case "move_external":
elif cmd == "speed" and len(parts) == 3: axis, value = args
self.client.adjust_speed(parts[1], parts[2]) print(self.client.move_external_axis(axis, value))
elif cmd == "override" and len(parts) == 2: case "correct":
state = parts[1] corr_type, axis, value = args
if state in ["on", "true", "1"]: print(self.client.correct_position(corr_type, axis, value))
self.client.override_safety(True) case "speed":
print("🛡️ Safety override ENABLED.") tech_param, value = args
elif state in ["off", "false", "0"]: print(self.client.adjust_speed(tech_param, value))
self.client.override_safety(False) case "override":
print("🛡️ Safety override DISABLED.") state = args[0]
else: self.client.override_safety(state in ["on", "true", "1"])
print("❌ Usage: override on | off") case "log":
elif cmd == "log": subcmd = args[0]
if len(parts) < 1:
print("⚠️ Usage: log start|stop|status")
return
subcmd = parts[0].lower()
if subcmd == "start": if subcmd == "start":
filename = self.client.start_logging() print(f"✅ Logging to {self.client.start_logging()}")
print(f"✅ Logging started → {filename}")
elif subcmd == "stop": elif subcmd == "stop":
self.client.stop_logging() print(self.client.stop_logging())
print("🛑 Logging stopped.")
elif subcmd == "status": elif subcmd == "status":
status = self.client.is_logging_active() print("📋", "ACTIVE" if self.client.is_logging_active() else "INACTIVE")
print("📊 Logging is currently", "ACTIVE ✅" if status else "INACTIVE ❌") case "graph":
sub = args[0]
if sub == "show":
self.client.visualise_csv_log(args[1])
elif sub == "compare":
print(self.client.compare_test_runs(args[1], args[2]))
case "plot":
plot_type, csv_path = args[0], args[1]
overlay = args[2] if len(args) > 2 else None
print(self.client.generate_plot(csv_path, plot_type, overlay))
case "move_cartesian":
start = self.parse_pose(args[0])
end = self.parse_pose(args[1])
steps = self.extract_value(args, "steps", 50, int)
rate = self.extract_value(args, "rate", 0.04, float)
self.client.move_cartesian_trajectory(start, end, steps, rate)
case "move_joint":
start = self.parse_pose(args[0])
end = self.parse_pose(args[1])
steps = self.extract_value(args, "steps", 50, int)
rate = self.extract_value(args, "rate", 0.04, float)
self.client.move_joint_trajectory(start, end, steps, rate)
case "queue_cartesian":
start = self.parse_pose(args[0])
end = self.parse_pose(args[1])
steps = self.extract_value(args, "steps", 50, int)
rate = self.extract_value(args, "rate", 0.04, float)
self.client.queue_cartesian_trajectory(start, end, steps, rate)
case "queue_joint":
start = self.parse_pose(args[0])
end = self.parse_pose(args[1])
steps = self.extract_value(args, "steps", 50, int)
rate = self.extract_value(args, "rate", 0.04, float)
self.client.queue_joint_trajectory(start, end, steps, rate)
case "execute_queue":
self.client.execute_queued_trajectories()
case "clear_queue":
self.client.clear_trajectory_queue()
case "show_queue":
print(self.client.get_trajectory_queue())
case "export_movement_data":
print(self.client.export_movement_data(args[0]))
case "compare_test_runs":
print(self.client.compare_test_runs(args[0], args[1]))
case "generate_report":
print(self.client.generate_report(args[0], args[1]))
case "safety-stop":
self.client.safety_stop()
case "safety-reset":
self.client.safety_reset()
case "safety-status":
print(self.client.safety_status())
case "safety-set-limit":
var, lo, hi = args
self.client.safety_set_limit(var, lo, hi)
case "krlparse":
self.client.parse_krl_to_csv(args[0], args[1], args[2])
case "inject_rsi":
input_krl = args[0]
output_krl = args[1] if len(args) > 1 else None
rsi_cfg = args[2] if len(args) > 2 else "RSIGatewayv1.rsi"
self.client.inject_rsi(input_krl, output_krl, rsi_cfg)
case "visualize":
self.client.visualise_csv_log(args[0], export="export" in args)
case "help":
self.show_help()
case _:
print("❌ Unknown command. Type 'help'.")
except Exception as e:
print(f"❌ Error: {e}")
else: def parse_pose(self, pose_string):
print("⚠️ Unknown log subcommand. Use: start, stop, status") return dict(item.split("=") for item in pose_string.split(","))
elif cmd == "graph":
if len(parts) == 0:
print("⚠️ Usage: graph show <file> | graph compare <file1> <file2>")
return
sub = parts[0].lower() def extract_value(self, args, key, default, cast_type):
for arg in args[2:]:
if arg.startswith(f"{key}="):
try:
return cast_type(arg.split("=")[1])
except ValueError:
return default
return default
if sub == "show" and len(parts) == 2: def exit(self):
self.client.visualise_csv_log(parts[1]) print("🛑 Exiting RSI CLI...")
elif sub == "compare" and len(parts) == 3:
self.client.compare_test_runs(parts[1], parts[2])
else:
print("⚠️ Usage:\n graph show <file>\n graph compare <file1> <file2>")
elif cmd == "export" and len(parts) == 2:
self.export_data(parts[1])
elif cmd == "compare" and len(parts) == 3:
self.compare_test_runs(parts[1], parts[2])
elif cmd == "report" and len(parts) >= 3:
self.generate_report(parts[1], parts[2])
elif cmd == "exit":
self.client.stop_rsi() self.client.stop_rsi()
self.running = False self.running = False
elif cmd == "help":
self.show_help()
elif cmd == "visualize" and len(parts) >= 2:
csv_file = parts[1]
export = ("export" in parts)
self.visualize(csv_file, export)
elif cmd == "krlparse" and len(parts) == 4:
src_file, dat_file, output_file = parts[1], parts[2], parts[3]
self.krl_parse(src_file, dat_file, output_file)
elif cmd == "inject_rsi" and len(parts) >= 2:
input_krl = parts[1]
output_krl = parts[2] if len(parts) >= 3 else None
rsi_config = parts[3] if len(parts) == 4 else "RSIGatewayv1.rsi"
self.inject_rsi(input_krl, output_krl, rsi_config)
elif cmd == "show" and len(parts) == 2 and parts[1] == "all":
variables = self.client.show_variables()
print("📤 Send Variables:")
for k, v in variables["send_variables"].items():
print(f" {k}: {v}")
print("📥 Receive Variables:")
for k, v in variables["receive_variables"].items():
print(f" {k}: {v}")
elif cmd == "show" and len(parts) == 2 and parts[1] == "live":
data = self.client.get_live_data()
print("📡 Live Data:")
for k, v in data.items():
print(f" {k}: {v}")
elif cmd == "log" and len(parts) == 2 and parts[1] == "status":
active = self.client.is_logging_active()
print(f"📋 Logging is {'ACTIVE' if active else 'INACTIVE'}")
elif cmd == "move_cartesian" and len(parts) >= 3:
start_dict = self.parse_pose_string(parts[1])
end_dict = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", default=50, cast_type=int)
rate = self.extract_optional_value(parts, "rate", default=0.04, cast_type=float)
trajectory = self.client.generate_trajectory(start_dict, end_dict, steps=steps, space="cartesian")
self.client.execute_trajectory(trajectory, space="cartesian", rate=rate)
elif cmd == "move_joint" and len(parts) >= 3:
start_dict = self.parse_pose_string(parts[1])
end_dict = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", default=50, cast_type=int)
rate = self.extract_optional_value(parts, "rate", default=0.4, cast_type=float)
trajectory = self.client.generate_trajectory(start_dict, end_dict, steps=steps, space="joint")
self.client.execute_trajectory(trajectory, space="joint", rate=rate)
elif cmd == "queue_cartesian" and len(parts) >= 3:
start = self.parse_pose_string(parts[1])
end = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", 50, int)
rate = self.extract_optional_value(parts, "rate", 0.04, float)
traj = self.client.generate_trajectory(start, end, steps, "cartesian")
self.client.queue_trajectory(traj, "cartesian", rate)
elif cmd == "queue_joint" and len(parts) >= 3:
start = self.parse_pose_string(parts[1])
end = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", 50, int)
rate = self.extract_optional_value(parts, "rate", 0.04, float)
traj = self.client.generate_trajectory(start, end, steps, "joint")
self.client.queue_trajectory(traj, "joint", rate)
elif cmd == "execute_queue":
self.client.execute_queued_trajectories()
elif cmd == "clear_queue":
self.client.clear_trajectory_queue()
elif cmd == "show_queue":
queue = self.client.get_trajectory_queue()
print(f"🧾 Trajectory Queue: {len(queue)} items")
for i, q in enumerate(queue):
print(f" {i + 1}. {q['space']} | {q['steps']} steps | {q['rate']}s")
elif cmd == "export_movement_data" and len(parts) == 2:
result = self.client.export_movement_data(parts[1])
print(result)
elif cmd == "compare_test_runs" and len(parts) == 3:
result = self.client.compare_test_runs(parts[1], parts[2])
print("📊 Comparison Results:")
for key, stats in result.items():
print(f"{key}: mean_diff={stats['mean_diff']:.3f}, max_diff={stats['max_diff']:.3f}")
elif cmd == "generate_report" and len(parts) in [2, 3]:
output = parts[2] if len(parts) == 3 else "report.txt"
result = generate_report(parts[1], output)
print(result)
elif cmd == "safety-stop":
self.client.safety_manager.emergency_stop()
print("🛑 Safety: Emergency Stop activated.")
elif cmd == "safety-reset":
self.client.safety_manager.reset_stop()
print("✅ Safety: Emergency Stop reset. Motion allowed.")
elif cmd == "safety-status":
sm = self.client.safety_manager
override_active = self.client.is_safety_overridden()
print("🛡️ Safety System Status:")
print(" 🧱 Emergency Stop:", "ACTIVE 🛑" if sm.is_stopped() else "CLEARED ✅")
print(" 🛡️ Safety Override:", "ENABLED" if override_active else "DISABLED")
print(" 📏 Enforced Limits:")
for var, (lo, hi) in sm.get_limits().items():
print(f" - {var}: {lo}{hi}")
elif cmd == "plot" and len(parts) >= 2:
plot_type = parts[1]
if len(parts) < 3:
print("⚠️ Usage: plot <type> <csv_path> [overlay_path]")
return
csv_path = parts[2]
overlay_path = parts[3] if len(parts) >= 4 else None
result = self.client.generate_plot(csv_path, plot_type=plot_type, overlay_path=overlay_path)
print(result)
elif cmd == "safety-set-limit" and len(parts) == 4:
var, lo, hi = parts[1], parts[2], parts[3]
try:
lo = float(lo)
hi = float(hi)
self.client.safety_manager.set_limit(var, lo, hi)
print(f"✅ Set limit for {var}: {lo} to {hi}")
except ValueError:
print("❌ Invalid numbers for limit. Usage: safety-set-limit RKorr.X -5 5")
else:
print("❌ Unknown command. Type 'help' for a list of commands.")
def toggle_alerts(self, state):
"""Enable or disable real-time alerts."""
if state == "on":
self.client.enable_alerts(True)
print("✅ Real-time alerts enabled.")
elif state == "off":
self.client.enable_alerts(False)
print("✅ Real-time alerts disabled.")
else:
print("❌ Invalid option. Use 'alerts on' or 'alerts off'.")
def set_alert_threshold(self, alert_type, value):
"""Set thresholds for deviation or force alerts."""
try:
value = float(value)
if alert_type in ["deviation", "force"]:
self.client.set_alert_threshold(alert_type, value)
print(f"{alert_type.capitalize()} alert threshold set to {value}")
else:
print("❌ Invalid alert type. Use 'deviation' or 'force'.")
except ValueError:
print("❌ Invalid threshold value. Enter a numeric value.")
def export_data(self, filename):
"""Export movement data to a CSV file."""
self.client.export_movement_data(filename)
print(f"✅ Data exported to {filename}")
def compare_test_runs(self, file1, file2):
"""Compare two test runs from CSV files."""
result = self.client.compare_test_runs(file1, file2)
print(result)
def generate_report(self, filename, format_type):
"""Generate a statistical report from movement data."""
if format_type not in ["csv", "json", "pdf"]:
print("❌ Invalid format. Use 'csv', 'json', or 'pdf'.")
return
self.client.generate_report(filename, format_type)
print(f"✅ Report generated: {filename}.{format_type}")
def show_help(self): def show_help(self):
"""Displays the list of available commands."""
print(""" print("""
Available Commands: Available Commands:
start, stop, exit start, stop, exit
set <var> <value>, show, ipoc, watch, reset, status, reconnect set <var> <value>
toggle <DiO/DiL> <0/1>, move_external <axis> <value> show, status, ipoc, watch, reset, reconnect
correct <RKorr/AKorr> <X/Y/Z/A/B/C> <value>, speed <Tech.TX> <value> alerts on/off, set_alert_threshold <type> <value>
override <limit> toggle <group> <name> <state>
log start <file>.csv, log stop, log status move_external <axis> <value>, correct <RKorr/AKorr> <axis> <value>
graph start <mode>, graph stop, graph mode <position|velocity|acceleration|force> speed <TechParam> <value>
graph overlay on/off, graph load_plan <file> log start|stop|status
export <filename.csv> graph show <csv> | graph compare <csv1> <csv2>
compare <file1.csv> <file2.csv> plot <type> <csv> [overlay]
report <filename> <csv|json|pdf> move_cartesian, move_joint, queue_cartesian, queue_joint
alerts on/off execute_queue, clear_queue, show_queue
set_alert_threshold <deviation|force> <value> export_movement_data <file>
show all - Show all current send and receive variables compare_test_runs <file1> <file2>
show live - Show real-time TCP, force, and IPOC values generate_report <file> <format>
log status - Display whether logging is currently active safety-stop, safety-reset, safety-status, safety-set-limit
move_cartesian <start> <end> [steps=50] [rate=0.012] krlparse <src> <dat> <output>
e.g., X=0,Y=0,Z=500 A=100,Y=0,Z=500 steps=100 rate=0.012 inject_rsi <input> [output] [rsi_config]
move_joint <start> <end> [steps=50] [rate=0.012] visualize <csv> [export]
e.g., A1=0,... A1=90,... steps=60 help
queue_cartesian <start> <end> [steps=50] [rate=0.012] - Queue linear Cartesian trajectory
queue_joint <start> <end> [steps=50] [rate=0.012] - Queue linear Joint trajectory
show_queue - Show queued trajectory segments
clear_queue - Clear all queued trajectories
execute_queue - Execute all queued motions
export_movement_data <csvfile> - Export logged motion data to CSV
compare_test_runs <file1> <file2> - Compare 2 test logs (e.g. deviation)
generate_report <input.csv> [out.txt] - Create a movement analysis report
safety-stop - Emergency stop: block motion
safety-reset - Reset emergency stop
safety-status - Show safety and override status
override on/off - Enable or disable safety override
alerts on/off
set_alert_threshold <deviation|force> <value>
""") """)
def visualise(self, csv_file, export=False):
try:
visualiser = KukaRSIVisualiser(csv_file)
visualiser.plot_trajectory()
visualiser.plot_joint_positions()
visualiser.plot_force_trends()
if export:
visualiser.export_graphs()
print(f"✅ Visualisations exported for '{csv_file}'")
except Exception as e:
print(f"❌ Failed to visualize '{csv_file}': {e}")
def krl_parse(self, src_file, dat_file, output_file):
"""CLI method to parse KRL files and output CSV."""
try:
parser = KRLParser(src_file, dat_file)
parser.parse_src()
parser.parse_dat()
parser.export_csv(output_file)
print(f"✅ KRL files parsed successfully. Output CSV: {output_file}")
except Exception as e:
print(f"❌ Failed to parse KRL files: {e}")
def inject_rsi(self, input_krl, output_krl=None, rsi_config="RSIGatewayv1.rsi"):
"""Inject RSI commands into a KRL file via CLI."""
try:
inject_rsi_to_krl(input_krl, output_krl, rsi_config)
output_path = output_krl if output_krl else input_krl
print(f"✅ RSI commands successfully injected into '{output_path}'")
except Exception as e:
print(f"❌ Failed to inject RSI commands: {e}")
def extract_optional_value(self, parts, key, default=0, cast_type=float):
"""
Extracts optional arguments like 'steps=100' or 'rate=0.01'
"""
for part in parts[3:]: # skip cmd, start, end
if part.startswith(f"{key}="):
try:
return cast_type(part.split("=")[1])
except ValueError:
return default
return default
if __name__ == "__main__": if __name__ == "__main__":
config_file = "RSI_EthernetConfig.xml" cli = RSICommandLineInterface("RSI_EthernetConfig.xml")
cli = RSICommandLineInterface(config_file)
cli.run() cli.run()

View File

@ -4,15 +4,14 @@ import time
from .config_parser import ConfigParser from .config_parser import ConfigParser
from .network_handler import NetworkProcess from .network_handler import NetworkProcess
from .safety_manager import SafetyManager from .safety_manager import SafetyManager
import threading
class RSIClient: class RSIClient:
"""Main RSI API class that integrates network, config handling, and message processing.""" """Main RSI API class that integrates network, config handling, and message processing."""
def __init__(self, config_file, rsi_limits_file=None): def __init__(self, config_file, rsi_limits_file=None):
"""Initialize the RSI client and set up logging and networking.""" logging.info(f"Loading RSI configuration from {config_file}...")
logging.info(f"📂 Loading RSI configuration from {config_file}...")
# Load configuration
self.config_parser = ConfigParser(config_file, rsi_limits_file) self.config_parser = ConfigParser(config_file, rsi_limits_file)
network_settings = self.config_parser.get_network_settings() network_settings = self.config_parser.get_network_settings()
@ -20,95 +19,70 @@ class RSIClient:
self.send_variables = self.manager.dict(self.config_parser.send_variables) self.send_variables = self.manager.dict(self.config_parser.send_variables)
self.receive_variables = self.manager.dict(self.config_parser.receive_variables) self.receive_variables = self.manager.dict(self.config_parser.receive_variables)
self.stop_event = multiprocessing.Event() self.stop_event = multiprocessing.Event()
self.start_event = multiprocessing.Event() # ✅ NEW
# ✅ Initialise safety manager from limits
self.safety_manager = SafetyManager(self.config_parser.safety_limits) self.safety_manager = SafetyManager(self.config_parser.safety_limits)
logging.info(f"🚀 Starting network process on {network_settings['ip']}:{network_settings['port']}...") # ✅ Create NetworkProcess but don't start communication yet
# ✅ Corrected constructor call with all necessary parameters
self.network_process = NetworkProcess( self.network_process = NetworkProcess(
network_settings["ip"], network_settings["ip"],
network_settings["port"], network_settings["port"],
self.send_variables, self.send_variables,
self.receive_variables, self.receive_variables,
self.stop_event, self.stop_event,
self.config_parser self.config_parser,
self.start_event
) )
self.network_process.start() self.network_process.start()
self.logger = None # Placeholder for logging module self.logger = None
def start(self): def start(self):
"""Keep the client running and allow periodic debugging.""" """Send start signal to NetworkProcess and run control loop."""
logging.info("✅ RSI Client Started") logging.info("RSIClient sending start signal to NetworkProcess...")
print("✅ RSI Client Started. Press CTRL+C to stop.") self.start_event.set()
self.running = True
logging.info("RSI Client Started")
try: try:
while not self.stop_event.is_set(): while self.running and not self.stop_event.is_set():
time.sleep(2) time.sleep(2)
except KeyboardInterrupt: except KeyboardInterrupt:
self.stop() self.stop()
except Exception as e: except Exception as e:
logging.error(f"❌ RSI Client encountered an error: {e}") logging.error(f"RSI Client encountered an error: {e}")
print(f"❌ RSI Client encountered an error: {e}")
def stop(self): def stop(self):
"""Stop the network process safely and close resources.""" """Stop the network process and the client thread safely."""
logging.info("🛑 Stopping RSI Client...") logging.info("Stopping RSI Client...")
print("🛑 Stopping RSI Client...")
self.stop_event.set() # ✅ Signal all processes to stop self.running = False
self.stop_event.set()
if self.network_process.is_alive(): if self.network_process and self.network_process.is_alive():
self.network_process.terminate() self.network_process.terminate()
self.network_process.join() self.network_process.join()
logging.info("✅ RSI Client Stopped") if hasattr(self, "thread") and self.thread and self.thread.is_alive():
print("✅ RSI Client Stopped") self.thread.join()
self.thread = None
def update_send_variable(self, name, value): self.reconnect()
print(f"[DEBUG] update_send_variable called with: {name} = {value}") logging.info("RSI Client Stopped")
if "." in name:
parent, child = name.split(".", 1)
if parent in self.send_variables:
current = dict(self.send_variables[parent]) # copy inner dict
current[child] = float(value)
self.send_variables[parent] = current # reassign to trigger proxy update
return f"✅ Updated {name} to {value}"
else:
return f"❌ Parent variable '{parent}' not found in send_variables"
else:
self.send_variables[name] = float(value)
return f"✅ Updated {name} to {value}"
def start_logging(self, filename):
if hasattr(self.network_process, "start_logging"):
self.network_process.start_logging(filename)
def stop_logging(self):
if hasattr(self.network_process, "stop_logging"):
self.network_process.stop_logging()
def is_logging_active(self):
return self.network_process.is_logging_active()
def enable_alerts(self, enable):
if hasattr(self.network_process, "enable_alerts"):
self.network_process.enable_alerts(enable)
def set_alert_threshold(self, alert_type, threshold):
if hasattr(self.network_process, "set_alert_threshold"):
self.network_process.set_alert_threshold(alert_type, threshold)
def reset_send_variables(self):
self.send_variables.update(self.config_parser.send_variables.copy())
def reconnect(self): def reconnect(self):
if self.network_process.is_alive(): """Reconnects the network process safely."""
logging.info("Reconnecting RSI Client network...")
if self.network_process and self.network_process.is_alive():
self.stop_event.set()
self.network_process.terminate() self.network_process.terminate()
self.network_process.join() self.network_process.join()
# Fresh new events
self.stop_event = multiprocessing.Event()
self.start_event = multiprocessing.Event()
# Create new network process
network_settings = self.config_parser.get_network_settings() network_settings = self.config_parser.get_network_settings()
self.network_process = NetworkProcess( self.network_process = NetworkProcess(
network_settings["ip"], network_settings["ip"],
@ -116,24 +90,11 @@ class RSIClient:
self.send_variables, self.send_variables,
self.receive_variables, self.receive_variables,
self.stop_event, self.stop_event,
self.config_parser self.config_parser,
self.start_event
) )
self.network_process.start() self.network_process.start()
# Fresh control thread
def get_movement_data(self): self.thread = threading.Thread(target=self.start, daemon=True)
"""Returns a list of all logged data entries.""" self.thread.start()
if hasattr(self, "logger") and self.logger:
return self.logger.get_all_records()
return []
if __name__ == "__main__":
config_file = "RSI_EthernetConfig.xml"
client = RSIClient(config_file)
try:
client.start()
except KeyboardInterrupt:
client.stop()

View File

@ -3,7 +3,7 @@ import logging
from src.RSIPI.rsi_limit_parser import parse_rsi_limits from src.RSIPI.rsi_limit_parser import parse_rsi_limits
# ✅ Configure Logging (toggleable) # ✅ Configure Logging (toggleable)
LOGGING_ENABLED = True # Change to False to silence logging output LOGGING_ENABLED = True # Change too False to silence logging output
if LOGGING_ENABLED: if LOGGING_ENABLED:
logging.basicConfig( logging.basicConfig(
@ -67,9 +67,9 @@ class RSIConfig:
if self.rsi_limits_file: if self.rsi_limits_file:
try: try:
self.safety_limits = parse_rsi_limits(self.rsi_limits_file) self.safety_limits = parse_rsi_limits(self.rsi_limits_file)
logging.info(f"Loaded safety limits from {self.rsi_limits_file}") logging.info(f"Loaded safety limits from {self.rsi_limits_file}")
except Exception as e: except Exception as e:
logging.warning(f"⚠️ Failed to load RSI safety limits: {e}") logging.warning(f"Failed to load RSI safety limits: {e}")
self.safety_limits = {} self.safety_limits = {}
@staticmethod @staticmethod

View File

@ -60,8 +60,8 @@ class EchoServer:
self.running = True self.running = True
self.thread = threading.Thread(target=self.send_message, daemon=True) self.thread = threading.Thread(target=self.send_message, daemon=True)
logging.info(f"Echo Server started on {self.server_address}") logging.info(f"Echo Server started on {self.server_address}")
print(f"Echo Server started in {self.mode.upper()} mode.") print(f"Echo Server started in {self.mode.upper()} mode.")
def receive_and_process(self): def receive_and_process(self):
""" """
@ -98,7 +98,7 @@ class EchoServer:
elif tag == "IPOC": elif tag == "IPOC":
self.ipoc_value = int(elem.text.strip()) self.ipoc_value = int(elem.text.strip())
logging.debug(f"Processed input: {ET.tostring(root).decode()}") logging.debug(f"Processed input: {ET.tostring(root).decode()}")
except socket.timeout: except socket.timeout:
pass # No data within delay window pass # No data within delay window
except ConnectionResetError: except ConnectionResetError:
@ -148,7 +148,7 @@ class EchoServer:
def stop(self): def stop(self):
"""Stops the echo server and cleans up the socket.""" """Stops the echo server and cleans up the socket."""
print("🛑 Stopping Echo Server...") print("Stopping Echo Server...")
self.running = False self.running = False
self.thread.join() self.thread.join()
self.udp_socket.close() self.udp_socket.close()

View File

@ -118,9 +118,9 @@ class RSIGraphing:
"""Switch graphing mode at runtime (position, velocity, acceleration, force).""" """Switch graphing mode at runtime (position, velocity, acceleration, force)."""
if mode in ["position", "velocity", "acceleration", "force"]: if mode in ["position", "velocity", "acceleration", "force"]:
self.mode = mode self.mode = mode
print(f"Graphing mode changed to: {mode}") print(f"Graphing mode changed to: {mode}")
else: else:
print("Invalid mode. Available: position, velocity, acceleration, force") print("Invalid mode. Available: position, velocity, acceleration, force")
def set_alert_threshold(self, alert_type, threshold): def set_alert_threshold(self, alert_type, threshold):
"""Update threshold values for alerts.""" """Update threshold values for alerts."""
@ -128,12 +128,12 @@ class RSIGraphing:
self.deviation_threshold = threshold self.deviation_threshold = threshold
elif alert_type == "force": elif alert_type == "force":
self.force_threshold = threshold self.force_threshold = threshold
print(f"{alert_type.capitalize()} alert threshold set to {threshold}") print(f"{alert_type.capitalize()} alert threshold set to {threshold}")
def enable_alerts(self, enable): def enable_alerts(self, enable):
"""Enable or disable real-time alerts.""" """Enable or disable real-time alerts."""
self.alerts_enabled = enable self.alerts_enabled = enable
print(f"Alerts {'enabled' if enable else 'disabled'}.") print(f"Alerts {'enabled' if enable else 'disabled'}.")
def stop(self): def stop(self):
"""Gracefully stop plotting by closing the figure.""" """Gracefully stop plotting by closing the figure."""