diff --git a/src/RSIPI/config_parser.py b/src/RSIPI/config_parser.py index 705fea1..059e16c 100644 --- a/src/RSIPI/config_parser.py +++ b/src/RSIPI/config_parser.py @@ -1,14 +1,28 @@ import xml.etree.ElementTree as ET class ConfigParser: - """Handles parsing the RSI config file and generating structured send/receive variables.""" + """ + Parses an RSI XML configuration file to extract structured variable definitions and + network settings for both sending and receiving messages. Also integrates optional + safety limits from an RSI limits XML file. + """ def __init__(self, config_file, rsi_limits_file=None): - """Initialize with the configuration file and process send/receive structures.""" + """ + Constructor method that loads the config file, parses variable definitions, and optionally + loads safety limits. + + Args: + config_file (str): Path to the RSI_EthernetConfig.xml file. + rsi_limits_file (str, optional): Path to .rsi.xml file containing safety limits. + """ from src.RSIPI.rsi_limit_parser import parse_rsi_limits + + self.config_file = config_file self.rsi_limits_file = rsi_limits_file self.safety_limits = {} - self.config_file = config_file + + # Defines known internal variable structures used in RSI messaging self.internal_structure = { "ComStatus": "String", "RIst": {"X":0, "Y":0, "Z":0, "A":0, "B":0, "C":0}, @@ -36,18 +50,19 @@ class ConfigParser: "Tech.T5": {"T51":0, "T52":0, "T53":0, "T54":0, "T55":0, "T56":0, "T57":0, "T58":0, "T59":0, "T510":0}, "Tech.T6": {"T61":0, "T62":0, "T63":0, "T64":0, "T65":0, "T66":0, "T67":0, "T68":0, "T69":0, "T610":0}, } - self.network_settings = {} - self.receive_variables,self.send_variables = self.process_config() - # ✅ Rename Tech.XX keys to Tech in send and receive variables at the last step + self.network_settings = {} + self.receive_variables, self.send_variables = self.process_config() + + # Flatten Tech.CX and Tech.TX keys into a single 'Tech' dictionary self.rename_tech_keys(self.send_variables) self.rename_tech_keys(self.receive_variables) - # ✅ Ensure IPOC is always present in send variables + # Ensure IPOC is always included in send variables if "IPOC" not in self.send_variables: - self.send_variables.update({"IPOC": 0}) + self.send_variables["IPOC"] = 0 - # Load safety limits from .rsi.xml if provided + # Optionally load safety limits from .rsi.xml file if self.rsi_limits_file: try: self.safety_limits = parse_rsi_limits(self.rsi_limits_file) @@ -59,7 +74,12 @@ class ConfigParser: print(f"✅ Final Receive Variables: {self.receive_variables}") def process_config(self): - """Parse the config file and create structured send and receive variables.""" + """ + Parses the RSI config file and builds the send/receive variable dictionaries. + + Returns: + tuple: (send_vars, receive_vars) structured dictionaries. + """ send_vars = {} receive_vars = {} @@ -67,7 +87,7 @@ class ConfigParser: tree = ET.parse(self.config_file) root = tree.getroot() - # ✅ Extract network settings + # Extract section for IP/port/etc. config = root.find("CONFIG") if config is None: raise ValueError("❌ Missing section in RSI_EthernetConfig.xml") @@ -76,17 +96,15 @@ class ConfigParser: "ip": config.find("IP_NUMBER").text.strip() if config.find("IP_NUMBER") is not None else None, "port": int(config.find("PORT").text.strip()) if config.find("PORT") is not None else None, "sentype": config.find("SENTYPE").text.strip() if config.find("SENTYPE") is not None else None, - "onlysend": config.find("ONLYSEND").text.strip().upper() == "TRUE" if config.find( - "ONLYSEND") is not None else False, + "onlysend": config.find("ONLYSEND").text.strip().upper() == "TRUE" if config.find("ONLYSEND") is not None else False, } - # ✅ Debugging output to check loaded values print(f"✅ Loaded network settings: {self.network_settings}") if None in self.network_settings.values(): raise ValueError("❌ Missing one or more required network settings (ip, port, sentype, onlysend)") - # ✅ Parse SEND section + # Parse SEND section send_section = root.find("SEND/ELEMENTS") if send_section is not None: for element in send_section.findall("ELEMENT"): @@ -94,7 +112,7 @@ class ConfigParser: var_type = element.get("TYPE", "") self.process_variable_structure(send_vars, tag, var_type) - # ✅ Parse RECEIVE section + # Parse RECEIVE section receive_section = root.find("RECEIVE/ELEMENTS") if receive_section is not None: for element in receive_section.findall("ELEMENT"): @@ -102,9 +120,6 @@ class ConfigParser: var_type = element.get("TYPE", "") self.process_variable_structure(receive_vars, tag, var_type) - print(f"✅ Final Send Variables: {send_vars}") - print(f"✅ Final Receive Variables: {receive_vars}") - return send_vars, receive_vars except Exception as e: @@ -112,42 +127,63 @@ class ConfigParser: return {}, {} def process_variable_structure(self, var_dict, tag, var_type, indx=""): - """Handles structured and simple variables based on internal structure and TYPE attribute.""" - tag = tag.replace("DEF_", "") # ✅ Strip "DEF_" prefix + """ + Processes and assigns a variable to the dictionary based on its tag and type. + + Args: + var_dict (dict): Dictionary to add variable to. + tag (str): Variable tag (can be nested like Tech.T1). + var_type (str): Variable type (e.g. BOOL, DOUBLE, STRING). + indx (str): Optional index (unused). + """ + tag = tag.replace("DEF_", "") # Remove DEF_ prefix if present print(f"🔍 Assigning {tag}: INDX={indx}, TYPE={var_type}") if tag in self.internal_structure: + # If pre-defined internally, copy structure internal_value = self.internal_structure[tag] - if isinstance(internal_value, dict): - var_dict[tag] = internal_value.copy() - else: - var_dict[tag] = internal_value # ✅ Assign default internal value (e.g., "EStr Test") + var_dict[tag] = internal_value.copy() if isinstance(internal_value, dict) else internal_value print(f"✅ INTERNAL Match: {tag} -> {var_dict[tag]}") elif "." in tag: + # Handle nested dictionary e.g. Tech.T21 -> { 'Tech': { 'T21': 0.0 } } parent, subkey = tag.split(".", 1) if parent not in var_dict: var_dict[parent] = {} var_dict[parent][subkey] = self.get_default_value(var_type) print(f"📂 Assigned '{tag}' as nested dictionary under '{parent}': {var_dict[parent]}") else: + # Standard single-value variable var_dict[tag] = self.get_default_value(var_type) print(f"📄 Assigned Standard Value: '{tag}' -> {var_dict[tag]}") @staticmethod def rename_tech_keys(var_dict): - """Rename Tech.XX keys to Tech in send and receive variables at the last step.""" + """ + Combines all Tech.XX keys into a single 'Tech' dictionary. + + Args: + var_dict (dict): The variable dictionary to modify. + """ tech_data = {} for key in list(var_dict.keys()): if key.startswith("Tech."): - tech_data.update(var_dict.pop(key)) # ✅ Merge all Tech.XX keys into Tech + tech_data.update(var_dict.pop(key)) if tech_data: - var_dict["Tech"] = tech_data # ✅ Store under Tech + var_dict["Tech"] = tech_data print(f"✅ Renamed Tech.XX keys to 'Tech': {var_dict['Tech']}") @staticmethod def get_default_value(var_type): - """Returns a default value based on TYPE.""" + """ + Returns a default Python value based on RSI TYPE. + + Args: + var_type (str): RSI type attribute. + + Returns: + Default Python value. + """ if var_type == "BOOL": return False elif var_type == "STRING": @@ -156,8 +192,13 @@ class ConfigParser: return 0 elif var_type == "DOUBLE": return 0.0 - return None # Unknown type fallback + return None def get_network_settings(self): - """Return the extracted network settings.""" + """ + Returns extracted IP, port, and message mode settings. + + Returns: + dict: Network settings extracted from the config file. + """ return self.network_settings diff --git a/src/RSIPI/echo_server_gui.py b/src/RSIPI/echo_server_gui.py new file mode 100644 index 0000000..a977c75 --- /dev/null +++ b/src/RSIPI/echo_server_gui.py @@ -0,0 +1,201 @@ +import tkinter as tk +from tkinter import ttk, filedialog +import threading +import time +from src.RSIPI.rsi_echo_server import EchoServer +import matplotlib.pyplot as plt +from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg +from mpl_toolkits.mplot3d import Axes3D +import numpy as np +import os +import math + + +class EchoServerGUI: + """ + Graphical interface for running and visualising the RSI Echo Server. + Provides live feedback of robot TCP position and joint states, along with XML message logs. + """ + + def __init__(self, master): + """ + Initialises the GUI, default values, and layout. + + Args: + master (tk.Tk): Root tkinter window. + """ + self.master = master + self.master.title("RSI Echo Server GUI") + self.master.geometry("1300x800") + + # Configurable input variables + self.config_file = tk.StringVar(value="RSI_EthernetConfig.xml") + self.mode = tk.StringVar(value="relative") + self.delay = tk.IntVar(value=4) + self.show_robot = tk.BooleanVar(value=True) + + # Internal state + self.server = None + self.visual_thread = None + self.running = False + self.trace = [] + + self.create_widgets() + + def create_widgets(self): + """Create and layout all UI elements including buttons, entry fields, and plots.""" + frame = ttk.Frame(self.master) + frame.pack(pady=10) + + # Config file input + ttk.Label(frame, text="RSI Config File:").grid(row=0, column=0, padx=5) + ttk.Entry(frame, textvariable=self.config_file, width=50).grid(row=0, column=1, padx=5) + ttk.Button(frame, text="Browse", command=self.browse_file).grid(row=0, column=2) + + # Mode selection + ttk.Label(frame, text="Mode:").grid(row=1, column=0, padx=5) + ttk.Combobox(frame, textvariable=self.mode, values=["relative", "absolute"], width=10).grid(row=1, column=1, sticky='w') + + # Delay input + ttk.Label(frame, text="Delay (ms):").grid(row=2, column=0, padx=5) + ttk.Entry(frame, textvariable=self.delay, width=10).grid(row=2, column=1, sticky='w') + + # Show/hide robot checkbox + ttk.Checkbutton(frame, text="Show Robot Stick Figure", variable=self.show_robot).grid(row=3, column=0, sticky='w') + + # Start/Stop buttons + ttk.Button(frame, text="Start Server", command=self.start_server).grid(row=4, column=0, pady=10) + ttk.Button(frame, text="Stop Server", command=self.stop_server).grid(row=4, column=1, pady=10) + + # Status label + self.status_label = ttk.Label(frame, text="Status: Idle") + self.status_label.grid(row=5, column=0, columnspan=3) + + # 3D Plot setup + self.figure = plt.Figure(figsize=(6, 5)) + self.ax = self.figure.add_subplot(111, projection='3d') + self.canvas = FigureCanvasTkAgg(self.figure, master=self.master) + self.canvas.get_tk_widget().pack(side=tk.LEFT, fill=tk.BOTH, expand=1) + + # XML message display + right_frame = ttk.Frame(self.master) + right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=1) + + ttk.Label(right_frame, text="📤 Sent Message").pack() + self.sent_box = tk.Text(right_frame, height=15, width=70) + self.sent_box.pack(pady=5) + + ttk.Label(right_frame, text="📩 Received Message").pack() + self.received_box = tk.Text(right_frame, height=15, width=70) + self.received_box.pack(pady=5) + + def browse_file(self): + """Open a file dialog to select a new RSI config file.""" + filename = filedialog.askopenfilename(filetypes=[("XML Files", "*.xml")]) + if filename: + self.config_file.set(filename) + + def start_server(self): + """ + Starts the Echo Server in a background thread and begins the visual update loop. + Validates the existence of the config file first. + """ + if not os.path.exists(self.config_file.get()): + self.status_label.config(text="❌ Config file not found.") + return + + self.server = EchoServer( + config_file=self.config_file.get(), + delay_ms=self.delay.get(), + mode=self.mode.get() + ) + self.server.start() + self.running = True + self.status_label.config(text=f"✅ Server running in {self.mode.get().upper()} mode.") + self.visual_thread = threading.Thread(target=self.update_visualisation, daemon=True) + self.visual_thread.start() + + def stop_server(self): + """Stops the Echo Server and ends the visual update thread.""" + if self.server: + self.server.stop() + self.status_label.config(text="😕 Server stopped.") + self.running = False + + def update_visualisation(self): + """ + Continuously updates the 3D plot and message windows with live robot TCP and joint data. + Also displays simplified robot kinematics as a stick figure if enabled. + """ + while self.running: + try: + pos = self.server.state.get("RIst", {}) + joints = self.server.state.get("AIPos", {}) + x = pos.get("X", 0) + y = pos.get("Y", 0) + z = pos.get("Z", 0) + + # Track TCP trace history (max 300 points) + self.trace.append((x, y, z)) + if len(self.trace) > 300: + self.trace.pop(0) + + self.ax.clear() + self.ax.set_title("3D Robot Movement Trace") + self.ax.set_xlabel("X") + self.ax.set_ylabel("Y") + self.ax.set_zlabel("Z") + + # Draw shaded base plane + floor_x, floor_y = np.meshgrid(np.linspace(-200, 200, 2), np.linspace(-200, 200, 2)) + floor_z = np.zeros_like(floor_x) + self.ax.plot_surface(floor_x, floor_y, floor_z, alpha=0.2, color='gray') + + # Draw TCP trajectory + xs = [pt[0] for pt in self.trace] + ys = [pt[1] for pt in self.trace] + zs = [pt[2] for pt in self.trace] + self.ax.plot(xs, ys, zs, label="TCP Path", color='blue') + + # Draw robot as stick figure if enabled + if self.show_robot.get(): + link_lengths = [100, 80, 60, 40, 20, 10] + angles = [math.radians(joints.get(f"A{i+1}", 0)) for i in range(6)] + + x0, y0, z0 = 0, 0, 0 + x_points = [x0] + y_points = [y0] + z_points = [z0] + + for i in range(6): + dx = link_lengths[i] * math.cos(angles[i]) + dy = link_lengths[i] * math.sin(angles[i]) + dz = 0 if i < 3 else link_lengths[i] * math.sin(angles[i]) + x0 += dx + y0 += dy + z0 += dz + x_points.append(x0) + y_points.append(y0) + z_points.append(z0) + + self.ax.plot(x_points, y_points, z_points, label="Robot Arm", color='red', marker='o') + + self.ax.legend() + self.canvas.draw() + + # Update message displays + self.received_box.delete("1.0", tk.END) + self.received_box.insert(tk.END, self.server.last_received.strip() if hasattr(self.server, 'last_received') else "") + + self.sent_box.delete("1.0", tk.END) + self.sent_box.insert(tk.END, self.server.generate_message().strip()) + + time.sleep(0.2) + except Exception as e: + print(f"[Visualisation Error] {e}") + + +if __name__ == "__main__": + root = tk.Tk() + app = EchoServerGUI(root) + root.mainloop() diff --git a/src/RSIPI/inject_rsi_to_krl.py b/src/RSIPI/inject_rsi_to_krl.py index f436f48..5ca1bd6 100644 --- a/src/RSIPI/inject_rsi_to_krl.py +++ b/src/RSIPI/inject_rsi_to_krl.py @@ -1,21 +1,26 @@ def inject_rsi_to_krl(input_file, output_file=None, rsi_config="RSIGatewayv1.rsi"): """ - Injects RSI commands into a KUKA KRL (.src) program file. + Injects RSI commands into a KUKA KRL (.src) program file by: + - Declaring RSI variables. + - Creating the RSI context with a given configuration. + - Starting and stopping RSI execution around the program body. Args: input_file (str): Path to the original KRL file. - output_file (str, optional): Path to the output KRL file. Defaults to input_file if None. - rsi_config (str): RSI configuration filename. + output_file (str, optional): Output file to save modified code. Defaults to overwriting input_file. + rsi_config (str): Name of the RSI configuration (usually ending in .rsi). """ if output_file is None: - output_file = input_file + output_file = input_file # Overwrite original file if no output specified + # RSI declarations to insert at top rsi_start = """ ; RSI Variable Declarations DECL INT ret DECL INT CONTID """ + # RSI context creation and startup block rsi_middle = f""" ; Create RSI Context ret = RSI_CREATE("{rsi_config}", CONTID, TRUE) @@ -30,6 +35,7 @@ IF (ret <> RSIOK) THEN ENDIF """ + # RSI shutdown block to insert before END rsi_end = """ ; Stop RSI Execution ret = RSI_OFF() @@ -38,9 +44,11 @@ IF (ret <> RSIOK) THEN ENDIF """ + # Read original KRL file into memory with open(input_file, "r") as file: lines = file.readlines() + # Identify key structural markers in the KRL program header_end, ini_end, end_start = None, None, None for i, line in enumerate(lines): @@ -51,30 +59,21 @@ ENDIF elif line.strip().startswith("END"): end_start = i + # Validate presence of required sections if header_end is None or ini_end is None or end_start is None: raise ValueError("Required markers (DEF, ;ENDFOLD (INI), END) not found in KRL file.") + # Inject modified contents into new or overwritten file with open(output_file, "w") as file: - # Write header and RSI declarations - file.writelines(lines[:header_end + 1]) - file.write(rsi_start) - - # Write INI section - file.writelines(lines[header_end + 1:ini_end + 1]) - - # Write RSI context creation and activation - file.write(rsi_middle) - - # Write main code section - file.writelines(lines[ini_end + 1:end_start]) - - # Write RSI deactivation - file.write(rsi_end) - - # Write END line - file.write(lines[end_start]) + file.writelines(lines[:header_end + 1]) # Preserve header + file.write(rsi_start) # Add RSI declarations + file.writelines(lines[header_end + 1:ini_end + 1]) # Preserve INI block + file.write(rsi_middle) # Insert RSI start commands + file.writelines(lines[ini_end + 1:end_start]) # Preserve main body + file.write(rsi_end) # Insert RSI stop commands + file.write(lines[end_start]) # Write final END line -# Example usage: +# Example usage if __name__ == "__main__": - inject_rsi_to_krl("my_program.src", "my_program_rsi.src") \ No newline at end of file + inject_rsi_to_krl("my_program.src", "my_program_rsi.src") diff --git a/src/RSIPI/krl_to_csv_parser.py b/src/RSIPI/krl_to_csv_parser.py index 4c49f91..a7a8022 100644 --- a/src/RSIPI/krl_to_csv_parser.py +++ b/src/RSIPI/krl_to_csv_parser.py @@ -5,26 +5,43 @@ from collections import OrderedDict # noinspection PyTypeChecker class KRLParser: - """Parses KUKA KRL .src and .dat files, extracting Cartesian coordinates into CSV.""" + """ + Parses KUKA KRL .src and .dat files to extract Cartesian coordinates + and exports them into a structured CSV format. + """ def __init__(self, src_file, dat_file): + """ + Initialise the parser with the paths to the .src and .dat files. + + Args: + src_file (str): Path to the KRL .src file. + dat_file (str): Path to the KRL .dat file. + """ self.src_file = src_file self.dat_file = dat_file - self.positions = OrderedDict() + self.positions = OrderedDict() # Maintain order of appearance from .src def parse_src(self): - """Parses .src file and collects all positional references.""" + """ + Parses the .src file to extract all position references (labels like P1, P2, etc.). + These are later used to match against coordinate data from the .dat file. + """ pattern = re.compile(r"PDAT_ACT=([A-Z]+\d+)", re.IGNORECASE) with open(self.src_file, 'r') as file: for line in file: match = pattern.search(line) if match: pos_ref = match.group(1).upper() - self.positions[pos_ref] = {} + self.positions[pos_ref] = {} # Placeholder for coordinates + print("📌 Extracted labels from .src:", self.positions.keys()) def parse_dat(self): - """Parses .dat file extracting all Cartesian coordinates.""" + """ + Parses the .dat file and retrieves Cartesian coordinate data for each known position reference. + Matches only those positions previously found in the .src file. + """ pos_pattern = re.compile( r"DECL E6POS ([A-Z]+\d+)=\{X ([^,]+),Y ([^,]+),Z ([^,]+),A ([^,]+),B ([^,]+),C ([^,]+),S ([^,]+),T ([^,]+)", re.IGNORECASE @@ -45,14 +62,21 @@ class KRLParser: 'S': int(match.group(8)), 'T': int(match.group(9)), } + # Only store coordinates if the label was found in the .src if pos_name in self.positions: self.positions[pos_name] = coords print("📄 Current line:", line.strip()) + print("📌 Looking for positions in .dat:", self.positions.keys()) def export_csv(self, output_file): - """Exports the parsed positions to CSV.""" + """ + Writes the extracted position data into a CSV file. + + Args: + output_file (str): Path to the output CSV file. + """ fieldnames = ["Sequence", "PosRef", "X", "Y", "Z", "A", "B", "C", "S", "T"] with open(output_file, 'w', newline='') as csv_file: @@ -60,12 +84,13 @@ class KRLParser: writer.writeheader() for idx, (pos_ref, coords) in enumerate(self.positions.items()): - if coords: + if coords: # Skip empty entries (e.g. unmatched labels) writer.writerow({ "Sequence": idx, "PosRef": pos_ref, **coords }) + print("📥 Final positions extracted:", self.positions) diff --git a/src/RSIPI/kuka_visualizer.py b/src/RSIPI/kuka_visualiser.py similarity index 64% rename from src/RSIPI/kuka_visualizer.py rename to src/RSIPI/kuka_visualiser.py index edff8b4..345e7bf 100644 --- a/src/RSIPI/kuka_visualizer.py +++ b/src/RSIPI/kuka_visualiser.py @@ -4,23 +4,49 @@ import argparse import os -class KukaRSIVisualizer: +class KukaRSIVisualiser: + """ + Visualises robot motion and diagnostics from RSI-generated CSV logs. + + Supports: + - 3D trajectory plotting (actual vs planned) + - Joint position plotting with safety band overlays + - Force correction trend visualisation + - Optional graph export to PNG + """ + def __init__(self, csv_file, safety_limits=None): + """ + Initialise the visualiser. + + Args: + csv_file (str): Path to the RSI CSV log. + safety_limits (dict): Optional dict of axis limits (e.g., {"AIPos.A1": [-170, 170]}). + """ self.csv_file = csv_file self.safety_limits = safety_limits or {} + if not os.path.exists(csv_file): raise FileNotFoundError(f"CSV file {csv_file} not found.") + self.df = pd.read_csv(csv_file) def plot_trajectory(self, save_path=None): - """Plots the robot's trajectory in 3D space.""" + """ + Plots the 3D robot trajectory from actual and planned data. + + Args: + save_path (str): Optional path to save the figure. + """ fig = plt.figure() ax = fig.add_subplot(111, projection='3d') - ax.plot(self.df["RIst.X"], self.df["RIst.Y"], self.df["RIst.Z"], label="Actual Trajectory", linestyle='-') + ax.plot(self.df["RIst.X"], self.df["RIst.Y"], self.df["RIst.Z"], + label="Actual Trajectory", linestyle='-') if "RSol.X" in self.df.columns: - ax.plot(self.df["RSol.X"], self.df["RSol.Y"], self.df["RSol.Z"], label="Planned Trajectory", linestyle='--') + ax.plot(self.df["RSol.X"], self.df["RSol.Y"], self.df["RSol.Z"], + label="Planned Trajectory", linestyle='--') ax.set_xlabel("X Position") ax.set_ylabel("Y Position") @@ -33,10 +59,21 @@ class KukaRSIVisualizer: plt.show() def has_column(self, col): + """ + Checks if the given column exists in the dataset. + + Args: + col (str): Column name to check. + """ return col in self.df.columns def plot_joint_positions(self, save_path=None): - """Plots joint positions over time with safety limit overlays.""" + """ + Plots joint angle positions over time, with optional safety zone overlays. + + Args: + save_path (str): Optional path to save the figure. + """ plt.figure() time_series = range(len(self.df)) @@ -44,7 +81,6 @@ class KukaRSIVisualizer: if col in self.df.columns: plt.plot(time_series, self.df[col], label=col) - # 🔴 Overlay safety band if col in self.safety_limits: low, high = self.safety_limits[col] plt.axhspan(low, high, color='red', alpha=0.1, label=f"{col} Safe Zone") @@ -59,7 +95,12 @@ class KukaRSIVisualizer: plt.show() def plot_force_trends(self, save_path=None): - """Plots force trends if force data is logged.""" + """ + Plots force correction trends (PosCorr.*) over time, if present. + + Args: + save_path (str): Optional path to save the figure. + """ force_columns = ["PosCorr.X", "PosCorr.Y", "PosCorr.Z"] plt.figure() time_series = range(len(self.df)) @@ -82,7 +123,12 @@ class KukaRSIVisualizer: plt.show() def export_graphs(self, export_dir="exports"): - """Exports all graphs to PNG/PDF.""" + """ + Saves all graphs (trajectory, joints, force) as PNG images. + + Args: + export_dir (str): Output directory. + """ os.makedirs(export_dir, exist_ok=True) self.plot_trajectory(save_path=os.path.join(export_dir, "trajectory.png")) self.plot_joint_positions(save_path=os.path.join(export_dir, "joint_positions.png")) @@ -91,7 +137,7 @@ class KukaRSIVisualizer: if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Visualize RSI data logs.") + parser = argparse.ArgumentParser(description="Visualise RSI data logs.") parser.add_argument("csv_file", type=str, help="Path to the RSI CSV log file.") parser.add_argument("--export", action="store_true", help="Export graphs as PNG/PDF.") parser.add_argument("--limits", type=str, help="Optional .rsi.xml file to overlay safety bands") @@ -101,13 +147,13 @@ if __name__ == "__main__": if args.limits: from src.RSIPI.rsi_limit_parser import parse_rsi_limits limits = parse_rsi_limits(args.limits) - visualizer = KukaRSIVisualizer(args.csv_file, safety_limits=limits) + visualiser = Kukarsivisualiser(args.csv_file, safety_limits=limits) else: - visualizer = KukaRSIVisualizer(args.csv_file) + visualiser = Kukarsivisualiser(args.csv_file) - visualizer.plot_trajectory() - visualizer.plot_joint_positions() - visualizer.plot_force_trends() + visualiser.plot_trajectory() + visualiser.plot_joint_positions() + visualiser.plot_force_trends() if args.export: - visualizer.export_graphs() + visualiser.export_graphs() diff --git a/src/RSIPI/rsi_api.py b/src/RSIPI/rsi_api.py index 078cf46..0db38ed 100644 --- a/src/RSIPI/rsi_api.py +++ b/src/RSIPI/rsi_api.py @@ -4,7 +4,7 @@ import json import matplotlib.pyplot as plt from .rsi_client import RSIClient from .rsi_graphing import RSIGraphing -from .kuka_visualizer import KukaRSIVisualizer +from .kuka_visualiser import Kukarsivisualiser from .krl_to_csv_parser import KRLParser from .inject_rsi_to_krl import inject_rsi_to_krl import threading @@ -232,7 +232,7 @@ class RSIAPI: csv_file (str): Path to CSV log file. export (bool): Whether to export the plots. """ - visualizer = KukaRSIVisualizer(csv_file) + visualizer = Kukarsivisualiser(csv_file) visualizer.plot_trajectory() visualizer.plot_joint_positions() visualizer.plot_force_trends() diff --git a/src/RSIPI/rsi_cli.py b/src/RSIPI/rsi_cli.py index c5b38b8..0c9f324 100644 --- a/src/RSIPI/rsi_cli.py +++ b/src/RSIPI/rsi_cli.py @@ -1,5 +1,5 @@ from .rsi_api import RSIAPI -from .kuka_visualizer import KukaRSIVisualizer +from .kuka_visualiser import Kukarsivisualiser from .krl_to_csv_parser import KRLParser from .inject_rsi_to_krl import inject_rsi_to_krl @@ -299,7 +299,7 @@ Available Commands: def visualize(self, csv_file, export=False): try: - visualizer = KukaRSIVisualizer(csv_file) + visualizer = Kukarsivisualiser(csv_file) visualizer.plot_trajectory() visualizer.plot_joint_positions() visualizer.plot_force_trends() diff --git a/src/RSIPI/rsi_config.py b/src/RSIPI/rsi_config.py index 9555018..22a2d36 100644 --- a/src/RSIPI/rsi_config.py +++ b/src/RSIPI/rsi_config.py @@ -2,20 +2,27 @@ import xml.etree.ElementTree as ET import logging from src.RSIPI.rsi_limit_parser import parse_rsi_limits -# ✅ Configure Logging (Allow it to be turned off) -LOGGING_ENABLED = True # Change to False to disable logging +# ✅ Configure Logging (toggleable) +LOGGING_ENABLED = True # Change to False to silence logging output if LOGGING_ENABLED: logging.basicConfig( - filename="rsi_config.log", # Save logs to file - level=logging.DEBUG, # Log everything for debugging + filename="rsi_config.log", + level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) -class RSIConfig: - """Processes the RSI config.xml file to determine communication parameters.""" +class RSIConfig: + """ + Loads and parses the RSI EthernetConfig.xml file, extracting: + - Network communication settings + - Variables to send/receive (with correct structure) + - Optional safety limit data from .rsi.xml file + """ + + # Known internal RSI variables and their structure internal = { "ComStatus": "String", "RIst": ["X", "Y", "Z", "A", "B", "C"], @@ -37,6 +44,13 @@ class RSIConfig: } def __init__(self, config_file, rsi_limits_file=None): + """ + Initialise config loader. + + Args: + config_file (str): Path to the RSI EthernetConfig.xml file. + rsi_limits_file (str): Optional path to .rsi.xml safety limits. + """ self.config_file = config_file self.rsi_limits_file = rsi_limits_file self.safety_limits = {} @@ -46,9 +60,10 @@ class RSIConfig: self.receive_variables = {} self.load_config() - self.load_safety_limits() # ✅ NEW STEP + self.load_safety_limits() # Optional safety overlay def load_safety_limits(self): + """Loads safety bands from an optional .rsi.xml file, if provided.""" if self.rsi_limits_file: try: self.safety_limits = parse_rsi_limits(self.rsi_limits_file) @@ -59,19 +74,26 @@ class RSIConfig: @staticmethod def strip_def_prefix(tag): - """Remove 'DEF_' prefix from a variable name.""" + """Removes DEF_ prefix from variable names.""" return tag.replace("DEF_", "") def process_internal_variable(self, tag): - """Process structured internal variables.""" + """Initialises structured internal variables based on known RSI types.""" if tag in self.internal: if isinstance(self.internal[tag], list): - return {key: 0.0 for key in self.internal[tag]} # Store as dict - return self.internal[tag] # Store as default value + return {key: 0.0 for key in self.internal[tag]} + return self.internal[tag] return None def process_variable_structure(self, var_dict, tag, var_type): - """Handles structured variables (e.g., `Tech.T2`, `RKorr.X`).""" + """ + Parses and groups structured variables, e.g., Tech.T2 → {'Tech': {'T2': 0.0}}. + + Args: + var_dict (dict): Either send_variables or receive_variables. + tag (str): The variable tag from XML. + var_type (str): The TYPE attribute from XML. + """ if tag in self.internal: var_dict[tag] = self.process_internal_variable(tag) elif "." in tag: @@ -84,7 +106,7 @@ class RSIConfig: @staticmethod def get_default_value(var_type): - """Returns a default value based on TYPE.""" + """Returns a suitable default value for a given variable type.""" if var_type == "BOOL": return False elif var_type == "STRING": @@ -93,16 +115,20 @@ class RSIConfig: return 0 elif var_type == "DOUBLE": return 0.0 - return None # Unknown type fallback + return None # Fallback for unknown types def load_config(self): - """Load and parse the config.xml file, ensuring variables are structured correctly.""" + """ + Parses the RSI config.xml, extracting: + - IP/port and communication mode + - Structured send and receive variable templates + """ try: logging.info(f"Loading config file: {self.config_file}") tree = ET.parse(self.config_file) root = tree.getroot() - # Read Network Settings + # Extract network settings config = root.find("CONFIG") self.network_settings = { "ip": config.find("IP_NUMBER").text.strip(), @@ -112,22 +138,20 @@ class RSIConfig: } logging.info(f"Network settings loaded: {self.network_settings}") - # Read SEND Section (Values we send to RSI client) + # Extract section send_section = root.find("SEND/ELEMENTS") for element in send_section.findall("ELEMENT"): tag = self.strip_def_prefix(element.get("TAG")) var_type = element.get("TYPE") - - if tag != "FREE": # ✅ Ignore `FREE` variables + if tag != "FREE": # Ignore placeholder entries self.process_variable_structure(self.send_variables, tag, var_type) - # Read RECEIVE Section (Values we receive from RSI client) + # Extract section receive_section = root.find("RECEIVE/ELEMENTS") for element in receive_section.findall("ELEMENT"): tag = self.strip_def_prefix(element.get("TAG")) var_type = element.get("TYPE") - - if tag != "FREE": # ✅ Ignore `FREE` variables + if tag != "FREE": self.process_variable_structure(self.receive_variables, tag, var_type) logging.info("Configuration successfully loaded.") @@ -138,10 +162,13 @@ class RSIConfig: logging.error(f"Error loading {self.config_file}: {e}") def get_network_settings(self): + """Returns network configuration (IP, port, SENTYPE, ONLYSEND).""" return self.network_settings def get_send_variables(self): + """Returns structured send variable dictionary.""" return self.send_variables def get_receive_variables(self): + """Returns structured receive variable dictionary.""" return self.receive_variables diff --git a/src/RSIPI/rsi_echo_server.py b/src/RSIPI/rsi_echo_server.py index 146c86f..3f463c1 100644 --- a/src/RSIPI/rsi_echo_server.py +++ b/src/RSIPI/rsi_echo_server.py @@ -1,4 +1,3 @@ - import socket import time import xml.etree.ElementTree as ET @@ -6,6 +5,7 @@ import logging import threading from src.RSIPI.rsi_config import RSIConfig +# ✅ Toggle logging for debugging purposes LOGGING_ENABLED = True if LOGGING_ENABLED: @@ -16,21 +16,39 @@ if LOGGING_ENABLED: datefmt="%Y-%m-%d %H:%M:%S" ) + class EchoServer: + """ + Simulates a KUKA RSI UDP server for testing. + + - Responds to incoming RSI correction commands. + - Updates internal position state (absolute/relative). + - Returns structured XML messages (like a real robot). + """ + def __init__(self, config_file, delay_ms=4, mode="relative"): + """ + Initialise the echo server. + + Args: + config_file (str): Path to RSI EthernetConfig.xml. + delay_ms (int): Delay between messages in milliseconds. + mode (str): Correction mode ("relative" or "absolute"). + """ self.config = RSIConfig(config_file) network_settings = self.config.get_network_settings() - self.server_address = ("0.0.0.0", 50000) - self.client_address = ("127.0.0.1", network_settings["port"]) + self.server_address = ("0.0.0.0", 50000) # Local bind + self.client_address = ("127.0.0.1", network_settings["port"]) # Client to echo back to self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_socket.bind(self.server_address) + self.last_received = None self.ipoc_value = 123456 - self.delay_ms = delay_ms / 1000 - self.mode = mode.lower() # relative or absolute + self.delay_ms = delay_ms / 1000 # Convert to seconds + self.mode = mode.lower() - # Internal simulation state + # Internal state to simulate robot values self.state = { "RIst": {k: 0.0 for k in ["X", "Y", "Z", "A", "B", "C"]}, "AIPos": {f"A{i}": 0.0 for i in range(1, 7)}, @@ -46,12 +64,16 @@ class EchoServer: print(f"✅ Echo Server started in {self.mode.upper()} mode.") def receive_and_process(self): + """ + Handles one incoming UDP message and updates the internal state accordingly. + Supports RKorr, AKorr, DiO, DiL, and IPOC updates. + """ try: self.udp_socket.settimeout(self.delay_ms) data, addr = self.udp_socket.recvfrom(1024) xml_string = data.decode() root = ET.fromstring(xml_string) - self.last_received = xml_string # ✅ Save for GUI + self.last_received = xml_string for elem in root: tag = elem.tag @@ -59,11 +81,13 @@ class EchoServer: for axis, value in elem.attrib.items(): value = float(value) if tag == "RKorr" and axis in self.state["RIst"]: + # Apply Cartesian correction if self.mode == "relative": self.state["RIst"][axis] += value else: self.state["RIst"][axis] = value elif tag == "AKorr" and axis in self.state["AIPos"]: + # Apply joint correction if self.mode == "relative": self.state["AIPos"][axis] += value else: @@ -76,7 +100,7 @@ class EchoServer: logging.debug(f"✅ Processed input: {ET.tostring(root).decode()}") except socket.timeout: - pass # Just ignore and continue + pass # No data within delay window except ConnectionResetError: print("⚠️ Connection was reset by client. Waiting before retry...") time.sleep(0.5) @@ -84,6 +108,10 @@ class EchoServer: print(f"[ERROR] Failed to process input: {e}") def generate_message(self): + """ + Creates a reply XML message based on current state. + Format matches KUKA RSI's expected response structure. + """ root = ET.Element("Rob", Type="KUKA") for key in ["RIst", "AIPos", "ELPos"]: @@ -98,6 +126,10 @@ class EchoServer: return ET.tostring(root, encoding="utf-8").decode() def send_message(self): + """ + Main loop to receive input, update state, and send reply. + Runs in a background thread until stopped. + """ while self.running: try: self.receive_and_process() @@ -110,16 +142,19 @@ class EchoServer: time.sleep(1) def start(self): + """Starts the echo server loop in a background thread.""" self.running = True self.thread.start() def stop(self): + """Stops the echo server and cleans up the socket.""" print("🛑 Stopping Echo Server...") self.running = False self.thread.join() self.udp_socket.close() print("✅ Echo Server Stopped.") + if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Run Echo Server for RSI Simulation") diff --git a/src/RSIPI/rsi_graphing.py b/src/RSIPI/rsi_graphing.py index b1c4d9b..d7913cb 100644 --- a/src/RSIPI/rsi_graphing.py +++ b/src/RSIPI/rsi_graphing.py @@ -6,20 +6,33 @@ from .rsi_client import RSIClient import csv class RSIGraphing: - """Handles real-time and CSV-based graphing for RSI analysis with alerts and threshold monitoring.""" + """ + Handles real-time plotting of RSI data with support for: + - Position/velocity/acceleration/force monitoring + - Live deviation alerts + - Optional overlay of planned vs actual trajectories + """ def __init__(self, client, mode="position", overlay=False, plan_file=None): - """Initialize graphing for real-time plotting.""" + """ + Initialise live graphing interface. + + Args: + client (RSIClient): Live RSI client instance providing data. + mode (str): One of "position", "velocity", "acceleration", "force". + overlay (bool): Whether to show planned vs. actual overlays. + plan_file (str): Optional CSV file containing planned trajectory. + """ self.client = client self.mode = mode - self.overlay = overlay # Enable/Disable planned vs. actual overlay - self.alerts_enabled = True # Default to alerts on - self.deviation_threshold = 5.0 # Default deviation threshold (mm) - self.force_threshold = 10.0 # Default force threshold (Nm) + self.overlay = overlay + self.alerts_enabled = True + self.deviation_threshold = 5.0 # mm + self.force_threshold = 10.0 # Nm self.fig, self.ax = plt.subplots(figsize=(10, 6)) - # ✅ Data storage - self.time_data = deque(maxlen=100) # Store last 100 data points + # Live data buffers + self.time_data = deque(maxlen=100) self.position_data = {axis: deque(maxlen=100) for axis in ["X", "Y", "Z"]} self.velocity_data = {axis: deque(maxlen=100) for axis in ["X", "Y", "Z"]} self.acceleration_data = {axis: deque(maxlen=100) for axis in ["X", "Y", "Z"]} @@ -29,7 +42,7 @@ class RSIGraphing: self.previous_velocities = {"X": 0, "Y": 0, "Z": 0} self.previous_time = time.time() - # ✅ Planned movement overlay storage + # Overlay comparison self.planned_data = {axis: deque(maxlen=100) for axis in ["X", "Y", "Z"]} self.deviation_data = {axis: deque(maxlen=100) for axis in ["X", "Y", "Z"]} @@ -40,16 +53,18 @@ class RSIGraphing: plt.show() def update_graph(self, frame): - """Update the live graph based on selected mode and check for alerts.""" + """ + Called periodically by matplotlib to refresh live graph based on current mode. + Also checks for force spikes and deviation alerts. + """ current_time = time.time() dt = current_time - self.previous_time self.previous_time = current_time - # ✅ Fetch live data from RSI client position = self.client.receive_variables.get("RIst", {"X": 0, "Y": 0, "Z": 0}) force = self.client.receive_variables.get("MaCur", {"A1": 0, "A2": 0, "A3": 0, "A4": 0, "A5": 0, "A6": 0}) - # ✅ Compute velocity and acceleration + # Compute motion derivatives for axis in ["X", "Y", "Z"]: velocity = (position[axis] - self.previous_positions[axis]) / dt if dt > 0 else 0 acceleration = (velocity - self.previous_velocities[axis]) / dt if dt > 0 else 0 @@ -65,7 +80,7 @@ class RSIGraphing: self.time_data.append(time.strftime("%H:%M:%S")) - # ✅ Planned vs. Actual movement overlay + # Compare to planned overlay if self.overlay and self.planned_data: for axis in ["X", "Y", "Z"]: planned_value = self.planned_data[axis][-1] if len(self.planned_data[axis]) > 0 else position[axis] @@ -73,17 +88,14 @@ class RSIGraphing: deviation = abs(position[axis] - planned_value) self.deviation_data[axis].append(deviation) - # ✅ Trigger deviation alert if self.alerts_enabled and deviation > self.deviation_threshold: print(f"⚠️ Deviation Alert! {axis} exceeds {self.deviation_threshold} mm (Deviation: {deviation:.2f} mm)") - # ✅ Trigger force spike alert if self.alerts_enabled: for axis in ["A1", "A2", "A3", "A4", "A5", "A6"]: if self.force_data[axis][-1] > self.force_threshold: print(f"⚠️ Force Spike Alert! {axis} exceeds {self.force_threshold} Nm (Force: {self.force_data[axis][-1]:.2f} Nm)") - # ✅ Clear the plot self.ax.clear() if self.mode == "position": @@ -103,7 +115,7 @@ class RSIGraphing: self.ax.tick_params(axis='x', rotation=45) def change_mode(self, mode): - """Change the graphing mode dynamically.""" + """Switch graphing mode at runtime (position, velocity, acceleration, force).""" if mode in ["position", "velocity", "acceleration", "force"]: self.mode = mode print(f"✅ Graphing mode changed to: {mode}") @@ -111,7 +123,7 @@ class RSIGraphing: print("❌ Invalid mode. Available: position, velocity, acceleration, force") def set_alert_threshold(self, alert_type, threshold): - """Set threshold for deviation or force alerts.""" + """Update threshold values for alerts.""" if alert_type == "deviation": self.deviation_threshold = threshold elif alert_type == "force": @@ -119,18 +131,27 @@ class RSIGraphing: print(f"✅ {alert_type.capitalize()} alert threshold set to {threshold}") def enable_alerts(self, enable): - """Enable or disable alerts.""" + """Enable or disable real-time alerts.""" self.alerts_enabled = enable print(f"✅ Alerts {'enabled' if enable else 'disabled'}.") def stop(self): - """Stop live plotting loop by closing the figure.""" + """Gracefully stop plotting by closing the figure.""" plt.close(self.fig) - def plot_csv_file(csv_path): - """Plot X/Y/Z position over time from a CSV log file.""" - import matplotlib.pyplot as plt + def load_plan(self, plan_file): + """Load planned XYZ trajectory from CSV for overlay comparison.""" + with open(plan_file, newline='') as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + for axis in ["X", "Y", "Z"]: + key = f"Send.RKorr.{axis}" + value = float(row.get(key, 0.0)) + self.planned_data[axis].append(value) + @staticmethod + def plot_csv_file(csv_path): + """Standalone method to plot XYZ position from a log file (no live client required).""" timestamps = [] x_data, y_data, z_data = [], [], [] @@ -154,6 +175,7 @@ class RSIGraphing: plt.tight_layout() plt.show() + if __name__ == "__main__": import argparse @@ -169,4 +191,3 @@ if __name__ == "__main__": if not args.alerts: graphing.enable_alerts(False) - diff --git a/src/RSIPI/rsi_limit_parser.py b/src/RSIPI/rsi_limit_parser.py index 63c60ce..947900b 100644 --- a/src/RSIPI/rsi_limit_parser.py +++ b/src/RSIPI/rsi_limit_parser.py @@ -1,9 +1,14 @@ import xml.etree.ElementTree as ET def parse_rsi_limits(xml_path): + """ + Parses a .rsi.xml file (RSIObject format) and returns structured safety limits. + + Returns: + dict: Structured limits in the form { "RKorr.X": (min, max), "AKorr.A1": (min, max), ... } + """ tree = ET.parse(xml_path) root = tree.getroot() - raw_limits = {} for rsi_object in root.findall("RSIObject"): @@ -11,9 +16,10 @@ def parse_rsi_limits(xml_path): params = rsi_object.find("Parameters") if params is None: - continue + continue # Skip malformed entries if obj_type == "POSCORR": + # Cartesian position correction limits for param in params.findall("Parameter"): name = param.attrib["Name"] value = float(param.attrib["ParamValue"]) @@ -30,11 +36,13 @@ def parse_rsi_limits(xml_path): elif name == "UpperLimZ": raw_limits["RKorr.Z_max"] = value elif name == "MaxRotAngle": + # Apply symmetric bounds to A/B/C for axis in ["A", "B", "C"]: raw_limits[f"RKorr.{axis}_min"] = -value raw_limits[f"RKorr.{axis}_max"] = value elif obj_type == "AXISCORR": + # Joint axis correction limits for param in params.findall("Parameter"): name = param.attrib["Name"] value = float(param.attrib["ParamValue"]) @@ -44,6 +52,7 @@ def parse_rsi_limits(xml_path): raw_limits[key] = value elif obj_type == "AXISCORREXT": + # External axis correction limits for param in params.findall("Parameter"): name = param.attrib["Name"] value = float(param.attrib["ParamValue"]) @@ -52,7 +61,7 @@ def parse_rsi_limits(xml_path): key = f"AKorr.E{axis}_{'min' if 'Lower' in name else 'max'}" raw_limits[key] = value - # Build structured dictionary with only valid (min, max) pairs + # Combine _min and _max entries into structured tuples structured_limits = {} for key in list(raw_limits.keys()): if key.endswith("_min"): diff --git a/src/RSIPI/safety_manager.py b/src/RSIPI/safety_manager.py index d67b59b..0284e85 100644 --- a/src/RSIPI/safety_manager.py +++ b/src/RSIPI/safety_manager.py @@ -2,24 +2,43 @@ import logging class SafetyManager: + """ + Enforces safety limits for RSI motion commands. + + Supports: + - Emergency stop logic (halts all validation) + - Limit enforcement for RKorr / AKorr / other variables + - Runtime limit updates + """ + def __init__(self, limits=None): """ - limits: Optional dictionary of variable limits in the form: - { - 'RKorr.X': (-5.0, 5.0), - 'AKorr.A1': (-6.0, 6.0), - ... - } + Args: + limits (dict): Optional safety limits in the form: + { + 'RKorr.X': (-5.0, 5.0), + 'AKorr.A1': (-6.0, 6.0), + ... + } """ self.limits = limits if limits is not None else {} self.e_stop = False - self.last_values = {} + self.last_values = {} # Reserved for future tracking or override detection def validate(self, path: str, value: float) -> float: """ - Validate a single variable update against safety rules. - Raises RuntimeError if E-Stop is active. - Raises ValueError if out of allowed bounds. + Validates an individual correction value. + + Args: + path (str): Key of the variable, e.g., "RKorr.X" + value (float): Proposed value to apply + + Returns: + float: Value if valid + + Raises: + RuntimeError: If emergency stop is active + ValueError: If value exceeds allowed bounds """ if self.e_stop: logging.warning(f"SafetyManager: {path} update blocked (E-STOP active)") @@ -34,21 +53,21 @@ class SafetyManager: return value def emergency_stop(self): - """Activate emergency stop mode (blocks all motion).""" + """Activates emergency stop: all motion validation will fail.""" self.e_stop = True def reset_stop(self): - """Reset emergency stop (motion allowed again).""" + """Resets emergency stop, allowing motion again.""" self.e_stop = False def set_limit(self, path: str, min_val: float, max_val: float): - """Manually set or override a safety limit at runtime.""" + """Sets or overrides a safety limit at runtime.""" self.limits[path] = (min_val, max_val) def get_limits(self): - """Return a copy of current limits.""" + """Returns a copy of all current safety limits.""" return self.limits.copy() def is_stopped(self): + """Returns whether the emergency stop is active.""" return self.e_stop - diff --git a/src/RSIPI/trajectory_planner.py b/src/RSIPI/trajectory_planner.py index 11a8576..d512a7f 100644 --- a/src/RSIPI/trajectory_planner.py +++ b/src/RSIPI/trajectory_planner.py @@ -1,19 +1,20 @@ from RSIPI.safety_manager import SafetyManager +import time def generate_trajectory(start, end, steps=100, space="cartesian", mode="relative", include_resets=True): """ - Generates a trajectory from start to end in N steps. + Generates a trajectory from start to end across N steps. Args: - start (dict): starting pose/position - end (dict): ending pose/position - steps (int): number of points in the trajectory - space (str): "cartesian" or "joint" (for logging/use clarity) - mode (str): "relative" or "absolute" - include_resets (bool): if True, adds reset-to-zero between pulses (only for relative mode) + start (dict): Starting pose or joint values. + end (dict): Target pose or joint values. + steps (int): Number of interpolation steps. + space (str): "cartesian" or "joint" (for API dispatch and validation). + mode (str): "relative" or "absolute" movement type. + include_resets (bool): Inserts zero-correction resets between steps in relative mode. Returns: - list[dict]: trajectory points + list[dict]: List of movement command dictionaries. """ if mode not in ["relative", "absolute"]: raise ValueError("mode must be 'relative' or 'absolute'") @@ -23,7 +24,10 @@ def generate_trajectory(start, end, steps=100, space="cartesian", mode="relative axes = start.keys() trajectory = [] + # Optional safety check hook — assumes SafetyManager has static validation methods safety_fn = SafetyManager.check_cartesian_limits if space == "cartesian" else SafetyManager.check_joint_limits + global enforce_safety + enforce_safety = hasattr(SafetyManager, "check_cartesian_limits") # Enable only if those methods exist for i in range(1, steps + 1): point = {} @@ -31,25 +35,29 @@ def generate_trajectory(start, end, steps=100, space="cartesian", mode="relative delta = end[axis] - start[axis] value = start[axis] + (delta * i / steps) - if mode == "relative": - point[axis] = delta / steps - else: - point[axis] = value + point[axis] = delta / steps if mode == "relative" else value + # Optional safety enforcement if enforce_safety and not safety_fn(point): raise ValueError(f"⚠️ Safety check failed at step {i}: {point}") trajectory.append(point) if mode == "relative" and include_resets: + # Insert a zero-correction step to prevent drift trajectory.append({axis: 0.0 for axis in axes}) return trajectory def execute_trajectory(api, trajectory, space="cartesian", rate=0.004): """ - Sends a trajectory by updating send variables at a fixed rate. - Use only with absolute motion or slow relative steps. + Sends a list of corrections to the RSI API at fixed intervals. + + Args: + api: An RSI-compatible API object with update_cartesian / update_joints methods. + trajectory (list[dict]): Movement steps generated by generate_trajectory(). + space (str): "cartesian" or "joint". + rate (float): Time between steps in seconds (default = 4ms). """ for point in trajectory: if space == "cartesian": diff --git a/src/RSIPI/xml_handler.py b/src/RSIPI/xml_handler.py index 94f076b..cd3edff 100644 --- a/src/RSIPI/xml_handler.py +++ b/src/RSIPI/xml_handler.py @@ -1,16 +1,29 @@ import xml.etree.ElementTree as ET class XMLGenerator: - """Converts send and receive variables into properly formatted XML messages.""" + """ + Converts structured dictionaries of RSI send/receive variables into + valid XML strings for UDP transmission to/from the robot controller. + """ @staticmethod def generate_send_xml(send_variables, network_settings): - """Generate the send XML message dynamically based on send variables.""" + """ + Build an outgoing XML message based on the current send variables. + + Args: + send_variables (dict): Structured dictionary of values to send. + network_settings (dict): Contains 'sentype' used for the root element. + + Returns: + str: XML-formatted string ready for UDP transmission. + """ root = ET.Element("Sen", Type=network_settings["sentype"]) - print("XML Send Variables : ", send_variables) + + # Convert structured keys (e.g. RKorr, Tech) and flat elements (e.g. IPOC) for key, value in send_variables.items(): if key == "FREE": - continue + continue # Skip unused placeholder fields if isinstance(value, dict): element = ET.SubElement(root, key) @@ -23,16 +36,23 @@ class XMLGenerator: @staticmethod def generate_receive_xml(receive_variables): - """Generate the receive XML message dynamically based on receive variables.""" - root = ET.Element("Rob", Type="KUKA") # ✅ Root with Type="KUKA" + """ + Build an incoming XML message for emulation/testing purposes. + + Args: + receive_variables (dict): Structured dictionary of values to simulate reception. + + Returns: + str: XML-formatted string mimicking a KUKA robot's reply. + """ + root = ET.Element("Rob", Type="KUKA") for key, value in receive_variables.items(): - if isinstance(value, dict) or hasattr(value, "items"): # ✅ Handle dictionaries as elements with attributes + if isinstance(value, dict) or hasattr(value, "items"): element = ET.SubElement(root, key) for sub_key, sub_value in value.items(): element.set(sub_key, f"{float(sub_value):.2f}") - else: # ✅ Handle standard elements with text values + else: ET.SubElement(root, key).text = str(value) return ET.tostring(root, encoding="utf-8").decode() -