Commented all files.

This commit is contained in:
Adam 2025-04-13 22:48:42 +01:00
parent 08792f8182
commit 5747201d7f
14 changed files with 627 additions and 176 deletions

View File

@ -1,14 +1,28 @@
import xml.etree.ElementTree as ET
class ConfigParser:
"""Handles parsing the RSI config file and generating structured send/receive variables."""
"""
Parses an RSI XML configuration file to extract structured variable definitions and
network settings for both sending and receiving messages. Also integrates optional
safety limits from an RSI limits XML file.
"""
def __init__(self, config_file, rsi_limits_file=None):
"""Initialize with the configuration file and process send/receive structures."""
"""
Constructor method that loads the config file, parses variable definitions, and optionally
loads safety limits.
Args:
config_file (str): Path to the RSI_EthernetConfig.xml file.
rsi_limits_file (str, optional): Path to .rsi.xml file containing safety limits.
"""
from src.RSIPI.rsi_limit_parser import parse_rsi_limits
self.config_file = config_file
self.rsi_limits_file = rsi_limits_file
self.safety_limits = {}
self.config_file = config_file
# Defines known internal variable structures used in RSI messaging
self.internal_structure = {
"ComStatus": "String",
"RIst": {"X":0, "Y":0, "Z":0, "A":0, "B":0, "C":0},
@ -36,18 +50,19 @@ class ConfigParser:
"Tech.T5": {"T51":0, "T52":0, "T53":0, "T54":0, "T55":0, "T56":0, "T57":0, "T58":0, "T59":0, "T510":0},
"Tech.T6": {"T61":0, "T62":0, "T63":0, "T64":0, "T65":0, "T66":0, "T67":0, "T68":0, "T69":0, "T610":0},
}
self.network_settings = {}
self.receive_variables,self.send_variables = self.process_config()
# ✅ Rename Tech.XX keys to Tech in send and receive variables at the last step
self.network_settings = {}
self.receive_variables, self.send_variables = self.process_config()
# Flatten Tech.CX and Tech.TX keys into a single 'Tech' dictionary
self.rename_tech_keys(self.send_variables)
self.rename_tech_keys(self.receive_variables)
# ✅ Ensure IPOC is always present in send variables
# Ensure IPOC is always included in send variables
if "IPOC" not in self.send_variables:
self.send_variables.update({"IPOC": 0})
self.send_variables["IPOC"] = 0
# Load safety limits from .rsi.xml if provided
# Optionally load safety limits from .rsi.xml file
if self.rsi_limits_file:
try:
self.safety_limits = parse_rsi_limits(self.rsi_limits_file)
@ -59,7 +74,12 @@ class ConfigParser:
print(f"✅ Final Receive Variables: {self.receive_variables}")
def process_config(self):
"""Parse the config file and create structured send and receive variables."""
"""
Parses the RSI config file and builds the send/receive variable dictionaries.
Returns:
tuple: (send_vars, receive_vars) structured dictionaries.
"""
send_vars = {}
receive_vars = {}
@ -67,7 +87,7 @@ class ConfigParser:
tree = ET.parse(self.config_file)
root = tree.getroot()
# ✅ Extract network settings
# Extract <CONFIG> section for IP/port/etc.
config = root.find("CONFIG")
if config is None:
raise ValueError("❌ Missing <CONFIG> section in RSI_EthernetConfig.xml")
@ -76,17 +96,15 @@ class ConfigParser:
"ip": config.find("IP_NUMBER").text.strip() if config.find("IP_NUMBER") is not None else None,
"port": int(config.find("PORT").text.strip()) if config.find("PORT") is not None else None,
"sentype": config.find("SENTYPE").text.strip() if config.find("SENTYPE") is not None else None,
"onlysend": config.find("ONLYSEND").text.strip().upper() == "TRUE" if config.find(
"ONLYSEND") is not None else False,
"onlysend": config.find("ONLYSEND").text.strip().upper() == "TRUE" if config.find("ONLYSEND") is not None else False,
}
# ✅ Debugging output to check loaded values
print(f"✅ Loaded network settings: {self.network_settings}")
if None in self.network_settings.values():
raise ValueError("❌ Missing one or more required network settings (ip, port, sentype, onlysend)")
# Parse SEND section
# Parse SEND section
send_section = root.find("SEND/ELEMENTS")
if send_section is not None:
for element in send_section.findall("ELEMENT"):
@ -94,7 +112,7 @@ class ConfigParser:
var_type = element.get("TYPE", "")
self.process_variable_structure(send_vars, tag, var_type)
# Parse RECEIVE section
# Parse RECEIVE section
receive_section = root.find("RECEIVE/ELEMENTS")
if receive_section is not None:
for element in receive_section.findall("ELEMENT"):
@ -102,9 +120,6 @@ class ConfigParser:
var_type = element.get("TYPE", "")
self.process_variable_structure(receive_vars, tag, var_type)
print(f"✅ Final Send Variables: {send_vars}")
print(f"✅ Final Receive Variables: {receive_vars}")
return send_vars, receive_vars
except Exception as e:
@ -112,42 +127,63 @@ class ConfigParser:
return {}, {}
def process_variable_structure(self, var_dict, tag, var_type, indx=""):
"""Handles structured and simple variables based on internal structure and TYPE attribute."""
tag = tag.replace("DEF_", "") # ✅ Strip "DEF_" prefix
"""
Processes and assigns a variable to the dictionary based on its tag and type.
Args:
var_dict (dict): Dictionary to add variable to.
tag (str): Variable tag (can be nested like Tech.T1).
var_type (str): Variable type (e.g. BOOL, DOUBLE, STRING).
indx (str): Optional index (unused).
"""
tag = tag.replace("DEF_", "") # Remove DEF_ prefix if present
print(f"🔍 Assigning {tag}: INDX={indx}, TYPE={var_type}")
if tag in self.internal_structure:
# If pre-defined internally, copy structure
internal_value = self.internal_structure[tag]
if isinstance(internal_value, dict):
var_dict[tag] = internal_value.copy()
else:
var_dict[tag] = internal_value # ✅ Assign default internal value (e.g., "EStr Test")
var_dict[tag] = internal_value.copy() if isinstance(internal_value, dict) else internal_value
print(f"✅ INTERNAL Match: {tag} -> {var_dict[tag]}")
elif "." in tag:
# Handle nested dictionary e.g. Tech.T21 -> { 'Tech': { 'T21': 0.0 } }
parent, subkey = tag.split(".", 1)
if parent not in var_dict:
var_dict[parent] = {}
var_dict[parent][subkey] = self.get_default_value(var_type)
print(f"📂 Assigned '{tag}' as nested dictionary under '{parent}': {var_dict[parent]}")
else:
# Standard single-value variable
var_dict[tag] = self.get_default_value(var_type)
print(f"📄 Assigned Standard Value: '{tag}' -> {var_dict[tag]}")
@staticmethod
def rename_tech_keys(var_dict):
"""Rename Tech.XX keys to Tech in send and receive variables at the last step."""
"""
Combines all Tech.XX keys into a single 'Tech' dictionary.
Args:
var_dict (dict): The variable dictionary to modify.
"""
tech_data = {}
for key in list(var_dict.keys()):
if key.startswith("Tech."):
tech_data.update(var_dict.pop(key)) # ✅ Merge all Tech.XX keys into Tech
tech_data.update(var_dict.pop(key))
if tech_data:
var_dict["Tech"] = tech_data # ✅ Store under Tech
var_dict["Tech"] = tech_data
print(f"✅ Renamed Tech.XX keys to 'Tech': {var_dict['Tech']}")
@staticmethod
def get_default_value(var_type):
"""Returns a default value based on TYPE."""
"""
Returns a default Python value based on RSI TYPE.
Args:
var_type (str): RSI type attribute.
Returns:
Default Python value.
"""
if var_type == "BOOL":
return False
elif var_type == "STRING":
@ -156,8 +192,13 @@ class ConfigParser:
return 0
elif var_type == "DOUBLE":
return 0.0
return None # Unknown type fallback
return None
def get_network_settings(self):
"""Return the extracted network settings."""
"""
Returns extracted IP, port, and message mode settings.
Returns:
dict: Network settings extracted from the config file.
"""
return self.network_settings

View File

@ -0,0 +1,201 @@
import tkinter as tk
from tkinter import ttk, filedialog
import threading
import time
from src.RSIPI.rsi_echo_server import EchoServer
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from mpl_toolkits.mplot3d import Axes3D
import numpy as np
import os
import math
class EchoServerGUI:
"""
Graphical interface for running and visualising the RSI Echo Server.
Provides live feedback of robot TCP position and joint states, along with XML message logs.
"""
def __init__(self, master):
"""
Initialises the GUI, default values, and layout.
Args:
master (tk.Tk): Root tkinter window.
"""
self.master = master
self.master.title("RSI Echo Server GUI")
self.master.geometry("1300x800")
# Configurable input variables
self.config_file = tk.StringVar(value="RSI_EthernetConfig.xml")
self.mode = tk.StringVar(value="relative")
self.delay = tk.IntVar(value=4)
self.show_robot = tk.BooleanVar(value=True)
# Internal state
self.server = None
self.visual_thread = None
self.running = False
self.trace = []
self.create_widgets()
def create_widgets(self):
"""Create and layout all UI elements including buttons, entry fields, and plots."""
frame = ttk.Frame(self.master)
frame.pack(pady=10)
# Config file input
ttk.Label(frame, text="RSI Config File:").grid(row=0, column=0, padx=5)
ttk.Entry(frame, textvariable=self.config_file, width=50).grid(row=0, column=1, padx=5)
ttk.Button(frame, text="Browse", command=self.browse_file).grid(row=0, column=2)
# Mode selection
ttk.Label(frame, text="Mode:").grid(row=1, column=0, padx=5)
ttk.Combobox(frame, textvariable=self.mode, values=["relative", "absolute"], width=10).grid(row=1, column=1, sticky='w')
# Delay input
ttk.Label(frame, text="Delay (ms):").grid(row=2, column=0, padx=5)
ttk.Entry(frame, textvariable=self.delay, width=10).grid(row=2, column=1, sticky='w')
# Show/hide robot checkbox
ttk.Checkbutton(frame, text="Show Robot Stick Figure", variable=self.show_robot).grid(row=3, column=0, sticky='w')
# Start/Stop buttons
ttk.Button(frame, text="Start Server", command=self.start_server).grid(row=4, column=0, pady=10)
ttk.Button(frame, text="Stop Server", command=self.stop_server).grid(row=4, column=1, pady=10)
# Status label
self.status_label = ttk.Label(frame, text="Status: Idle")
self.status_label.grid(row=5, column=0, columnspan=3)
# 3D Plot setup
self.figure = plt.Figure(figsize=(6, 5))
self.ax = self.figure.add_subplot(111, projection='3d')
self.canvas = FigureCanvasTkAgg(self.figure, master=self.master)
self.canvas.get_tk_widget().pack(side=tk.LEFT, fill=tk.BOTH, expand=1)
# XML message display
right_frame = ttk.Frame(self.master)
right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=1)
ttk.Label(right_frame, text="📤 Sent Message").pack()
self.sent_box = tk.Text(right_frame, height=15, width=70)
self.sent_box.pack(pady=5)
ttk.Label(right_frame, text="📩 Received Message").pack()
self.received_box = tk.Text(right_frame, height=15, width=70)
self.received_box.pack(pady=5)
def browse_file(self):
"""Open a file dialog to select a new RSI config file."""
filename = filedialog.askopenfilename(filetypes=[("XML Files", "*.xml")])
if filename:
self.config_file.set(filename)
def start_server(self):
"""
Starts the Echo Server in a background thread and begins the visual update loop.
Validates the existence of the config file first.
"""
if not os.path.exists(self.config_file.get()):
self.status_label.config(text="❌ Config file not found.")
return
self.server = EchoServer(
config_file=self.config_file.get(),
delay_ms=self.delay.get(),
mode=self.mode.get()
)
self.server.start()
self.running = True
self.status_label.config(text=f"✅ Server running in {self.mode.get().upper()} mode.")
self.visual_thread = threading.Thread(target=self.update_visualisation, daemon=True)
self.visual_thread.start()
def stop_server(self):
"""Stops the Echo Server and ends the visual update thread."""
if self.server:
self.server.stop()
self.status_label.config(text="😕 Server stopped.")
self.running = False
def update_visualisation(self):
"""
Continuously updates the 3D plot and message windows with live robot TCP and joint data.
Also displays simplified robot kinematics as a stick figure if enabled.
"""
while self.running:
try:
pos = self.server.state.get("RIst", {})
joints = self.server.state.get("AIPos", {})
x = pos.get("X", 0)
y = pos.get("Y", 0)
z = pos.get("Z", 0)
# Track TCP trace history (max 300 points)
self.trace.append((x, y, z))
if len(self.trace) > 300:
self.trace.pop(0)
self.ax.clear()
self.ax.set_title("3D Robot Movement Trace")
self.ax.set_xlabel("X")
self.ax.set_ylabel("Y")
self.ax.set_zlabel("Z")
# Draw shaded base plane
floor_x, floor_y = np.meshgrid(np.linspace(-200, 200, 2), np.linspace(-200, 200, 2))
floor_z = np.zeros_like(floor_x)
self.ax.plot_surface(floor_x, floor_y, floor_z, alpha=0.2, color='gray')
# Draw TCP trajectory
xs = [pt[0] for pt in self.trace]
ys = [pt[1] for pt in self.trace]
zs = [pt[2] for pt in self.trace]
self.ax.plot(xs, ys, zs, label="TCP Path", color='blue')
# Draw robot as stick figure if enabled
if self.show_robot.get():
link_lengths = [100, 80, 60, 40, 20, 10]
angles = [math.radians(joints.get(f"A{i+1}", 0)) for i in range(6)]
x0, y0, z0 = 0, 0, 0
x_points = [x0]
y_points = [y0]
z_points = [z0]
for i in range(6):
dx = link_lengths[i] * math.cos(angles[i])
dy = link_lengths[i] * math.sin(angles[i])
dz = 0 if i < 3 else link_lengths[i] * math.sin(angles[i])
x0 += dx
y0 += dy
z0 += dz
x_points.append(x0)
y_points.append(y0)
z_points.append(z0)
self.ax.plot(x_points, y_points, z_points, label="Robot Arm", color='red', marker='o')
self.ax.legend()
self.canvas.draw()
# Update message displays
self.received_box.delete("1.0", tk.END)
self.received_box.insert(tk.END, self.server.last_received.strip() if hasattr(self.server, 'last_received') else "")
self.sent_box.delete("1.0", tk.END)
self.sent_box.insert(tk.END, self.server.generate_message().strip())
time.sleep(0.2)
except Exception as e:
print(f"[Visualisation Error] {e}")
if __name__ == "__main__":
root = tk.Tk()
app = EchoServerGUI(root)
root.mainloop()

View File

@ -1,21 +1,26 @@
def inject_rsi_to_krl(input_file, output_file=None, rsi_config="RSIGatewayv1.rsi"):
"""
Injects RSI commands into a KUKA KRL (.src) program file.
Injects RSI commands into a KUKA KRL (.src) program file by:
- Declaring RSI variables.
- Creating the RSI context with a given configuration.
- Starting and stopping RSI execution around the program body.
Args:
input_file (str): Path to the original KRL file.
output_file (str, optional): Path to the output KRL file. Defaults to input_file if None.
rsi_config (str): RSI configuration filename.
output_file (str, optional): Output file to save modified code. Defaults to overwriting input_file.
rsi_config (str): Name of the RSI configuration (usually ending in .rsi).
"""
if output_file is None:
output_file = input_file
output_file = input_file # Overwrite original file if no output specified
# RSI declarations to insert at top
rsi_start = """
; RSI Variable Declarations
DECL INT ret
DECL INT CONTID
"""
# RSI context creation and startup block
rsi_middle = f"""
; Create RSI Context
ret = RSI_CREATE("{rsi_config}", CONTID, TRUE)
@ -30,6 +35,7 @@ IF (ret <> RSIOK) THEN
ENDIF
"""
# RSI shutdown block to insert before END
rsi_end = """
; Stop RSI Execution
ret = RSI_OFF()
@ -38,9 +44,11 @@ IF (ret <> RSIOK) THEN
ENDIF
"""
# Read original KRL file into memory
with open(input_file, "r") as file:
lines = file.readlines()
# Identify key structural markers in the KRL program
header_end, ini_end, end_start = None, None, None
for i, line in enumerate(lines):
@ -51,30 +59,21 @@ ENDIF
elif line.strip().startswith("END"):
end_start = i
# Validate presence of required sections
if header_end is None or ini_end is None or end_start is None:
raise ValueError("Required markers (DEF, ;ENDFOLD (INI), END) not found in KRL file.")
# Inject modified contents into new or overwritten file
with open(output_file, "w") as file:
# Write header and RSI declarations
file.writelines(lines[:header_end + 1])
file.write(rsi_start)
# Write INI section
file.writelines(lines[header_end + 1:ini_end + 1])
# Write RSI context creation and activation
file.write(rsi_middle)
# Write main code section
file.writelines(lines[ini_end + 1:end_start])
# Write RSI deactivation
file.write(rsi_end)
# Write END line
file.write(lines[end_start])
file.writelines(lines[:header_end + 1]) # Preserve header
file.write(rsi_start) # Add RSI declarations
file.writelines(lines[header_end + 1:ini_end + 1]) # Preserve INI block
file.write(rsi_middle) # Insert RSI start commands
file.writelines(lines[ini_end + 1:end_start]) # Preserve main body
file.write(rsi_end) # Insert RSI stop commands
file.write(lines[end_start]) # Write final END line
# Example usage:
# Example usage
if __name__ == "__main__":
inject_rsi_to_krl("my_program.src", "my_program_rsi.src")

View File

@ -5,26 +5,43 @@ from collections import OrderedDict
# noinspection PyTypeChecker
class KRLParser:
"""Parses KUKA KRL .src and .dat files, extracting Cartesian coordinates into CSV."""
"""
Parses KUKA KRL .src and .dat files to extract Cartesian coordinates
and exports them into a structured CSV format.
"""
def __init__(self, src_file, dat_file):
"""
Initialise the parser with the paths to the .src and .dat files.
Args:
src_file (str): Path to the KRL .src file.
dat_file (str): Path to the KRL .dat file.
"""
self.src_file = src_file
self.dat_file = dat_file
self.positions = OrderedDict()
self.positions = OrderedDict() # Maintain order of appearance from .src
def parse_src(self):
"""Parses .src file and collects all positional references."""
"""
Parses the .src file to extract all position references (labels like P1, P2, etc.).
These are later used to match against coordinate data from the .dat file.
"""
pattern = re.compile(r"PDAT_ACT=([A-Z]+\d+)", re.IGNORECASE)
with open(self.src_file, 'r') as file:
for line in file:
match = pattern.search(line)
if match:
pos_ref = match.group(1).upper()
self.positions[pos_ref] = {}
self.positions[pos_ref] = {} # Placeholder for coordinates
print("📌 Extracted labels from .src:", self.positions.keys())
def parse_dat(self):
"""Parses .dat file extracting all Cartesian coordinates."""
"""
Parses the .dat file and retrieves Cartesian coordinate data for each known position reference.
Matches only those positions previously found in the .src file.
"""
pos_pattern = re.compile(
r"DECL E6POS ([A-Z]+\d+)=\{X ([^,]+),Y ([^,]+),Z ([^,]+),A ([^,]+),B ([^,]+),C ([^,]+),S ([^,]+),T ([^,]+)",
re.IGNORECASE
@ -45,14 +62,21 @@ class KRLParser:
'S': int(match.group(8)),
'T': int(match.group(9)),
}
# Only store coordinates if the label was found in the .src
if pos_name in self.positions:
self.positions[pos_name] = coords
print("📄 Current line:", line.strip())
print("📌 Looking for positions in .dat:", self.positions.keys())
def export_csv(self, output_file):
"""Exports the parsed positions to CSV."""
"""
Writes the extracted position data into a CSV file.
Args:
output_file (str): Path to the output CSV file.
"""
fieldnames = ["Sequence", "PosRef", "X", "Y", "Z", "A", "B", "C", "S", "T"]
with open(output_file, 'w', newline='') as csv_file:
@ -60,12 +84,13 @@ class KRLParser:
writer.writeheader()
for idx, (pos_ref, coords) in enumerate(self.positions.items()):
if coords:
if coords: # Skip empty entries (e.g. unmatched labels)
writer.writerow({
"Sequence": idx,
"PosRef": pos_ref,
**coords
})
print("📥 Final positions extracted:", self.positions)

View File

@ -4,23 +4,49 @@ import argparse
import os
class KukaRSIVisualizer:
class KukaRSIVisualiser:
"""
Visualises robot motion and diagnostics from RSI-generated CSV logs.
Supports:
- 3D trajectory plotting (actual vs planned)
- Joint position plotting with safety band overlays
- Force correction trend visualisation
- Optional graph export to PNG
"""
def __init__(self, csv_file, safety_limits=None):
"""
Initialise the visualiser.
Args:
csv_file (str): Path to the RSI CSV log.
safety_limits (dict): Optional dict of axis limits (e.g., {"AIPos.A1": [-170, 170]}).
"""
self.csv_file = csv_file
self.safety_limits = safety_limits or {}
if not os.path.exists(csv_file):
raise FileNotFoundError(f"CSV file {csv_file} not found.")
self.df = pd.read_csv(csv_file)
def plot_trajectory(self, save_path=None):
"""Plots the robot's trajectory in 3D space."""
"""
Plots the 3D robot trajectory from actual and planned data.
Args:
save_path (str): Optional path to save the figure.
"""
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
ax.plot(self.df["RIst.X"], self.df["RIst.Y"], self.df["RIst.Z"], label="Actual Trajectory", linestyle='-')
ax.plot(self.df["RIst.X"], self.df["RIst.Y"], self.df["RIst.Z"],
label="Actual Trajectory", linestyle='-')
if "RSol.X" in self.df.columns:
ax.plot(self.df["RSol.X"], self.df["RSol.Y"], self.df["RSol.Z"], label="Planned Trajectory", linestyle='--')
ax.plot(self.df["RSol.X"], self.df["RSol.Y"], self.df["RSol.Z"],
label="Planned Trajectory", linestyle='--')
ax.set_xlabel("X Position")
ax.set_ylabel("Y Position")
@ -33,10 +59,21 @@ class KukaRSIVisualizer:
plt.show()
def has_column(self, col):
"""
Checks if the given column exists in the dataset.
Args:
col (str): Column name to check.
"""
return col in self.df.columns
def plot_joint_positions(self, save_path=None):
"""Plots joint positions over time with safety limit overlays."""
"""
Plots joint angle positions over time, with optional safety zone overlays.
Args:
save_path (str): Optional path to save the figure.
"""
plt.figure()
time_series = range(len(self.df))
@ -44,7 +81,6 @@ class KukaRSIVisualizer:
if col in self.df.columns:
plt.plot(time_series, self.df[col], label=col)
# 🔴 Overlay safety band
if col in self.safety_limits:
low, high = self.safety_limits[col]
plt.axhspan(low, high, color='red', alpha=0.1, label=f"{col} Safe Zone")
@ -59,7 +95,12 @@ class KukaRSIVisualizer:
plt.show()
def plot_force_trends(self, save_path=None):
"""Plots force trends if force data is logged."""
"""
Plots force correction trends (PosCorr.*) over time, if present.
Args:
save_path (str): Optional path to save the figure.
"""
force_columns = ["PosCorr.X", "PosCorr.Y", "PosCorr.Z"]
plt.figure()
time_series = range(len(self.df))
@ -82,7 +123,12 @@ class KukaRSIVisualizer:
plt.show()
def export_graphs(self, export_dir="exports"):
"""Exports all graphs to PNG/PDF."""
"""
Saves all graphs (trajectory, joints, force) as PNG images.
Args:
export_dir (str): Output directory.
"""
os.makedirs(export_dir, exist_ok=True)
self.plot_trajectory(save_path=os.path.join(export_dir, "trajectory.png"))
self.plot_joint_positions(save_path=os.path.join(export_dir, "joint_positions.png"))
@ -91,7 +137,7 @@ class KukaRSIVisualizer:
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Visualize RSI data logs.")
parser = argparse.ArgumentParser(description="Visualise RSI data logs.")
parser.add_argument("csv_file", type=str, help="Path to the RSI CSV log file.")
parser.add_argument("--export", action="store_true", help="Export graphs as PNG/PDF.")
parser.add_argument("--limits", type=str, help="Optional .rsi.xml file to overlay safety bands")
@ -101,13 +147,13 @@ if __name__ == "__main__":
if args.limits:
from src.RSIPI.rsi_limit_parser import parse_rsi_limits
limits = parse_rsi_limits(args.limits)
visualizer = KukaRSIVisualizer(args.csv_file, safety_limits=limits)
visualiser = Kukarsivisualiser(args.csv_file, safety_limits=limits)
else:
visualizer = KukaRSIVisualizer(args.csv_file)
visualiser = Kukarsivisualiser(args.csv_file)
visualizer.plot_trajectory()
visualizer.plot_joint_positions()
visualizer.plot_force_trends()
visualiser.plot_trajectory()
visualiser.plot_joint_positions()
visualiser.plot_force_trends()
if args.export:
visualizer.export_graphs()
visualiser.export_graphs()

View File

@ -4,7 +4,7 @@ import json
import matplotlib.pyplot as plt
from .rsi_client import RSIClient
from .rsi_graphing import RSIGraphing
from .kuka_visualizer import KukaRSIVisualizer
from .kuka_visualiser import Kukarsivisualiser
from .krl_to_csv_parser import KRLParser
from .inject_rsi_to_krl import inject_rsi_to_krl
import threading
@ -232,7 +232,7 @@ class RSIAPI:
csv_file (str): Path to CSV log file.
export (bool): Whether to export the plots.
"""
visualizer = KukaRSIVisualizer(csv_file)
visualizer = Kukarsivisualiser(csv_file)
visualizer.plot_trajectory()
visualizer.plot_joint_positions()
visualizer.plot_force_trends()

View File

@ -1,5 +1,5 @@
from .rsi_api import RSIAPI
from .kuka_visualizer import KukaRSIVisualizer
from .kuka_visualiser import Kukarsivisualiser
from .krl_to_csv_parser import KRLParser
from .inject_rsi_to_krl import inject_rsi_to_krl
@ -299,7 +299,7 @@ Available Commands:
def visualize(self, csv_file, export=False):
try:
visualizer = KukaRSIVisualizer(csv_file)
visualizer = Kukarsivisualiser(csv_file)
visualizer.plot_trajectory()
visualizer.plot_joint_positions()
visualizer.plot_force_trends()

View File

@ -2,20 +2,27 @@ import xml.etree.ElementTree as ET
import logging
from src.RSIPI.rsi_limit_parser import parse_rsi_limits
# ✅ Configure Logging (Allow it to be turned off)
LOGGING_ENABLED = True # Change to False to disable logging
# ✅ Configure Logging (toggleable)
LOGGING_ENABLED = True # Change to False to silence logging output
if LOGGING_ENABLED:
logging.basicConfig(
filename="rsi_config.log", # Save logs to file
level=logging.DEBUG, # Log everything for debugging
filename="rsi_config.log",
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
class RSIConfig:
"""Processes the RSI config.xml file to determine communication parameters."""
class RSIConfig:
"""
Loads and parses the RSI EthernetConfig.xml file, extracting:
- Network communication settings
- Variables to send/receive (with correct structure)
- Optional safety limit data from .rsi.xml file
"""
# Known internal RSI variables and their structure
internal = {
"ComStatus": "String",
"RIst": ["X", "Y", "Z", "A", "B", "C"],
@ -37,6 +44,13 @@ class RSIConfig:
}
def __init__(self, config_file, rsi_limits_file=None):
"""
Initialise config loader.
Args:
config_file (str): Path to the RSI EthernetConfig.xml file.
rsi_limits_file (str): Optional path to .rsi.xml safety limits.
"""
self.config_file = config_file
self.rsi_limits_file = rsi_limits_file
self.safety_limits = {}
@ -46,9 +60,10 @@ class RSIConfig:
self.receive_variables = {}
self.load_config()
self.load_safety_limits() # ✅ NEW STEP
self.load_safety_limits() # Optional safety overlay
def load_safety_limits(self):
"""Loads safety bands from an optional .rsi.xml file, if provided."""
if self.rsi_limits_file:
try:
self.safety_limits = parse_rsi_limits(self.rsi_limits_file)
@ -59,19 +74,26 @@ class RSIConfig:
@staticmethod
def strip_def_prefix(tag):
"""Remove 'DEF_' prefix from a variable name."""
"""Removes DEF_ prefix from variable names."""
return tag.replace("DEF_", "")
def process_internal_variable(self, tag):
"""Process structured internal variables."""
"""Initialises structured internal variables based on known RSI types."""
if tag in self.internal:
if isinstance(self.internal[tag], list):
return {key: 0.0 for key in self.internal[tag]} # Store as dict
return self.internal[tag] # Store as default value
return {key: 0.0 for key in self.internal[tag]}
return self.internal[tag]
return None
def process_variable_structure(self, var_dict, tag, var_type):
"""Handles structured variables (e.g., `Tech.T2`, `RKorr.X`)."""
"""
Parses and groups structured variables, e.g., Tech.T2 {'Tech': {'T2': 0.0}}.
Args:
var_dict (dict): Either send_variables or receive_variables.
tag (str): The variable tag from XML.
var_type (str): The TYPE attribute from XML.
"""
if tag in self.internal:
var_dict[tag] = self.process_internal_variable(tag)
elif "." in tag:
@ -84,7 +106,7 @@ class RSIConfig:
@staticmethod
def get_default_value(var_type):
"""Returns a default value based on TYPE."""
"""Returns a suitable default value for a given variable type."""
if var_type == "BOOL":
return False
elif var_type == "STRING":
@ -93,16 +115,20 @@ class RSIConfig:
return 0
elif var_type == "DOUBLE":
return 0.0
return None # Unknown type fallback
return None # Fallback for unknown types
def load_config(self):
"""Load and parse the config.xml file, ensuring variables are structured correctly."""
"""
Parses the RSI config.xml, extracting:
- IP/port and communication mode
- Structured send and receive variable templates
"""
try:
logging.info(f"Loading config file: {self.config_file}")
tree = ET.parse(self.config_file)
root = tree.getroot()
# Read Network Settings
# Extract <CONFIG> network settings
config = root.find("CONFIG")
self.network_settings = {
"ip": config.find("IP_NUMBER").text.strip(),
@ -112,22 +138,20 @@ class RSIConfig:
}
logging.info(f"Network settings loaded: {self.network_settings}")
# Read SEND Section (Values we send to RSI client)
# Extract <SEND> section
send_section = root.find("SEND/ELEMENTS")
for element in send_section.findall("ELEMENT"):
tag = self.strip_def_prefix(element.get("TAG"))
var_type = element.get("TYPE")
if tag != "FREE": # ✅ Ignore `FREE` variables
if tag != "FREE": # Ignore placeholder entries
self.process_variable_structure(self.send_variables, tag, var_type)
# Read RECEIVE Section (Values we receive from RSI client)
# Extract <RECEIVE> section
receive_section = root.find("RECEIVE/ELEMENTS")
for element in receive_section.findall("ELEMENT"):
tag = self.strip_def_prefix(element.get("TAG"))
var_type = element.get("TYPE")
if tag != "FREE": # ✅ Ignore `FREE` variables
if tag != "FREE":
self.process_variable_structure(self.receive_variables, tag, var_type)
logging.info("Configuration successfully loaded.")
@ -138,10 +162,13 @@ class RSIConfig:
logging.error(f"Error loading {self.config_file}: {e}")
def get_network_settings(self):
"""Returns network configuration (IP, port, SENTYPE, ONLYSEND)."""
return self.network_settings
def get_send_variables(self):
"""Returns structured send variable dictionary."""
return self.send_variables
def get_receive_variables(self):
"""Returns structured receive variable dictionary."""
return self.receive_variables

View File

@ -1,4 +1,3 @@
import socket
import time
import xml.etree.ElementTree as ET
@ -6,6 +5,7 @@ import logging
import threading
from src.RSIPI.rsi_config import RSIConfig
# ✅ Toggle logging for debugging purposes
LOGGING_ENABLED = True
if LOGGING_ENABLED:
@ -16,21 +16,39 @@ if LOGGING_ENABLED:
datefmt="%Y-%m-%d %H:%M:%S"
)
class EchoServer:
"""
Simulates a KUKA RSI UDP server for testing.
- Responds to incoming RSI correction commands.
- Updates internal position state (absolute/relative).
- Returns structured XML messages (like a real robot).
"""
def __init__(self, config_file, delay_ms=4, mode="relative"):
"""
Initialise the echo server.
Args:
config_file (str): Path to RSI EthernetConfig.xml.
delay_ms (int): Delay between messages in milliseconds.
mode (str): Correction mode ("relative" or "absolute").
"""
self.config = RSIConfig(config_file)
network_settings = self.config.get_network_settings()
self.server_address = ("0.0.0.0", 50000)
self.client_address = ("127.0.0.1", network_settings["port"])
self.server_address = ("0.0.0.0", 50000) # Local bind
self.client_address = ("127.0.0.1", network_settings["port"]) # Client to echo back to
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_socket.bind(self.server_address)
self.last_received = None
self.ipoc_value = 123456
self.delay_ms = delay_ms / 1000
self.mode = mode.lower() # relative or absolute
self.delay_ms = delay_ms / 1000 # Convert to seconds
self.mode = mode.lower()
# Internal simulation state
# Internal state to simulate robot values
self.state = {
"RIst": {k: 0.0 for k in ["X", "Y", "Z", "A", "B", "C"]},
"AIPos": {f"A{i}": 0.0 for i in range(1, 7)},
@ -46,12 +64,16 @@ class EchoServer:
print(f"✅ Echo Server started in {self.mode.upper()} mode.")
def receive_and_process(self):
"""
Handles one incoming UDP message and updates the internal state accordingly.
Supports RKorr, AKorr, DiO, DiL, and IPOC updates.
"""
try:
self.udp_socket.settimeout(self.delay_ms)
data, addr = self.udp_socket.recvfrom(1024)
xml_string = data.decode()
root = ET.fromstring(xml_string)
self.last_received = xml_string # ✅ Save for GUI
self.last_received = xml_string
for elem in root:
tag = elem.tag
@ -59,11 +81,13 @@ class EchoServer:
for axis, value in elem.attrib.items():
value = float(value)
if tag == "RKorr" and axis in self.state["RIst"]:
# Apply Cartesian correction
if self.mode == "relative":
self.state["RIst"][axis] += value
else:
self.state["RIst"][axis] = value
elif tag == "AKorr" and axis in self.state["AIPos"]:
# Apply joint correction
if self.mode == "relative":
self.state["AIPos"][axis] += value
else:
@ -76,7 +100,7 @@ class EchoServer:
logging.debug(f"✅ Processed input: {ET.tostring(root).decode()}")
except socket.timeout:
pass # Just ignore and continue
pass # No data within delay window
except ConnectionResetError:
print("⚠️ Connection was reset by client. Waiting before retry...")
time.sleep(0.5)
@ -84,6 +108,10 @@ class EchoServer:
print(f"[ERROR] Failed to process input: {e}")
def generate_message(self):
"""
Creates a reply XML message based on current state.
Format matches KUKA RSI's expected response structure.
"""
root = ET.Element("Rob", Type="KUKA")
for key in ["RIst", "AIPos", "ELPos"]:
@ -98,6 +126,10 @@ class EchoServer:
return ET.tostring(root, encoding="utf-8").decode()
def send_message(self):
"""
Main loop to receive input, update state, and send reply.
Runs in a background thread until stopped.
"""
while self.running:
try:
self.receive_and_process()
@ -110,16 +142,19 @@ class EchoServer:
time.sleep(1)
def start(self):
"""Starts the echo server loop in a background thread."""
self.running = True
self.thread.start()
def stop(self):
"""Stops the echo server and cleans up the socket."""
print("🛑 Stopping Echo Server...")
self.running = False
self.thread.join()
self.udp_socket.close()
print("✅ Echo Server Stopped.")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Run Echo Server for RSI Simulation")

View File

@ -6,20 +6,33 @@ from .rsi_client import RSIClient
import csv
class RSIGraphing:
"""Handles real-time and CSV-based graphing for RSI analysis with alerts and threshold monitoring."""
"""
Handles real-time plotting of RSI data with support for:
- Position/velocity/acceleration/force monitoring
- Live deviation alerts
- Optional overlay of planned vs actual trajectories
"""
def __init__(self, client, mode="position", overlay=False, plan_file=None):
"""Initialize graphing for real-time plotting."""
"""
Initialise live graphing interface.
Args:
client (RSIClient): Live RSI client instance providing data.
mode (str): One of "position", "velocity", "acceleration", "force".
overlay (bool): Whether to show planned vs. actual overlays.
plan_file (str): Optional CSV file containing planned trajectory.
"""
self.client = client
self.mode = mode
self.overlay = overlay # Enable/Disable planned vs. actual overlay
self.alerts_enabled = True # Default to alerts on
self.deviation_threshold = 5.0 # Default deviation threshold (mm)
self.force_threshold = 10.0 # Default force threshold (Nm)
self.overlay = overlay
self.alerts_enabled = True
self.deviation_threshold = 5.0 # mm
self.force_threshold = 10.0 # Nm
self.fig, self.ax = plt.subplots(figsize=(10, 6))
# ✅ Data storage
self.time_data = deque(maxlen=100) # Store last 100 data points
# Live data buffers
self.time_data = deque(maxlen=100)
self.position_data = {axis: deque(maxlen=100) for axis in ["X", "Y", "Z"]}
self.velocity_data = {axis: deque(maxlen=100) for axis in ["X", "Y", "Z"]}
self.acceleration_data = {axis: deque(maxlen=100) for axis in ["X", "Y", "Z"]}
@ -29,7 +42,7 @@ class RSIGraphing:
self.previous_velocities = {"X": 0, "Y": 0, "Z": 0}
self.previous_time = time.time()
# ✅ Planned movement overlay storage
# Overlay comparison
self.planned_data = {axis: deque(maxlen=100) for axis in ["X", "Y", "Z"]}
self.deviation_data = {axis: deque(maxlen=100) for axis in ["X", "Y", "Z"]}
@ -40,16 +53,18 @@ class RSIGraphing:
plt.show()
def update_graph(self, frame):
"""Update the live graph based on selected mode and check for alerts."""
"""
Called periodically by matplotlib to refresh live graph based on current mode.
Also checks for force spikes and deviation alerts.
"""
current_time = time.time()
dt = current_time - self.previous_time
self.previous_time = current_time
# ✅ Fetch live data from RSI client
position = self.client.receive_variables.get("RIst", {"X": 0, "Y": 0, "Z": 0})
force = self.client.receive_variables.get("MaCur", {"A1": 0, "A2": 0, "A3": 0, "A4": 0, "A5": 0, "A6": 0})
# ✅ Compute velocity and acceleration
# Compute motion derivatives
for axis in ["X", "Y", "Z"]:
velocity = (position[axis] - self.previous_positions[axis]) / dt if dt > 0 else 0
acceleration = (velocity - self.previous_velocities[axis]) / dt if dt > 0 else 0
@ -65,7 +80,7 @@ class RSIGraphing:
self.time_data.append(time.strftime("%H:%M:%S"))
# ✅ Planned vs. Actual movement overlay
# Compare to planned overlay
if self.overlay and self.planned_data:
for axis in ["X", "Y", "Z"]:
planned_value = self.planned_data[axis][-1] if len(self.planned_data[axis]) > 0 else position[axis]
@ -73,17 +88,14 @@ class RSIGraphing:
deviation = abs(position[axis] - planned_value)
self.deviation_data[axis].append(deviation)
# ✅ Trigger deviation alert
if self.alerts_enabled and deviation > self.deviation_threshold:
print(f"⚠️ Deviation Alert! {axis} exceeds {self.deviation_threshold} mm (Deviation: {deviation:.2f} mm)")
# ✅ Trigger force spike alert
if self.alerts_enabled:
for axis in ["A1", "A2", "A3", "A4", "A5", "A6"]:
if self.force_data[axis][-1] > self.force_threshold:
print(f"⚠️ Force Spike Alert! {axis} exceeds {self.force_threshold} Nm (Force: {self.force_data[axis][-1]:.2f} Nm)")
# ✅ Clear the plot
self.ax.clear()
if self.mode == "position":
@ -103,7 +115,7 @@ class RSIGraphing:
self.ax.tick_params(axis='x', rotation=45)
def change_mode(self, mode):
"""Change the graphing mode dynamically."""
"""Switch graphing mode at runtime (position, velocity, acceleration, force)."""
if mode in ["position", "velocity", "acceleration", "force"]:
self.mode = mode
print(f"✅ Graphing mode changed to: {mode}")
@ -111,7 +123,7 @@ class RSIGraphing:
print("❌ Invalid mode. Available: position, velocity, acceleration, force")
def set_alert_threshold(self, alert_type, threshold):
"""Set threshold for deviation or force alerts."""
"""Update threshold values for alerts."""
if alert_type == "deviation":
self.deviation_threshold = threshold
elif alert_type == "force":
@ -119,18 +131,27 @@ class RSIGraphing:
print(f"{alert_type.capitalize()} alert threshold set to {threshold}")
def enable_alerts(self, enable):
"""Enable or disable alerts."""
"""Enable or disable real-time alerts."""
self.alerts_enabled = enable
print(f"✅ Alerts {'enabled' if enable else 'disabled'}.")
def stop(self):
"""Stop live plotting loop by closing the figure."""
"""Gracefully stop plotting by closing the figure."""
plt.close(self.fig)
def plot_csv_file(csv_path):
"""Plot X/Y/Z position over time from a CSV log file."""
import matplotlib.pyplot as plt
def load_plan(self, plan_file):
"""Load planned XYZ trajectory from CSV for overlay comparison."""
with open(plan_file, newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
for axis in ["X", "Y", "Z"]:
key = f"Send.RKorr.{axis}"
value = float(row.get(key, 0.0))
self.planned_data[axis].append(value)
@staticmethod
def plot_csv_file(csv_path):
"""Standalone method to plot XYZ position from a log file (no live client required)."""
timestamps = []
x_data, y_data, z_data = [], [], []
@ -154,6 +175,7 @@ class RSIGraphing:
plt.tight_layout()
plt.show()
if __name__ == "__main__":
import argparse
@ -169,4 +191,3 @@ if __name__ == "__main__":
if not args.alerts:
graphing.enable_alerts(False)

View File

@ -1,9 +1,14 @@
import xml.etree.ElementTree as ET
def parse_rsi_limits(xml_path):
"""
Parses a .rsi.xml file (RSIObject format) and returns structured safety limits.
Returns:
dict: Structured limits in the form { "RKorr.X": (min, max), "AKorr.A1": (min, max), ... }
"""
tree = ET.parse(xml_path)
root = tree.getroot()
raw_limits = {}
for rsi_object in root.findall("RSIObject"):
@ -11,9 +16,10 @@ def parse_rsi_limits(xml_path):
params = rsi_object.find("Parameters")
if params is None:
continue
continue # Skip malformed entries
if obj_type == "POSCORR":
# Cartesian position correction limits
for param in params.findall("Parameter"):
name = param.attrib["Name"]
value = float(param.attrib["ParamValue"])
@ -30,11 +36,13 @@ def parse_rsi_limits(xml_path):
elif name == "UpperLimZ":
raw_limits["RKorr.Z_max"] = value
elif name == "MaxRotAngle":
# Apply symmetric bounds to A/B/C
for axis in ["A", "B", "C"]:
raw_limits[f"RKorr.{axis}_min"] = -value
raw_limits[f"RKorr.{axis}_max"] = value
elif obj_type == "AXISCORR":
# Joint axis correction limits
for param in params.findall("Parameter"):
name = param.attrib["Name"]
value = float(param.attrib["ParamValue"])
@ -44,6 +52,7 @@ def parse_rsi_limits(xml_path):
raw_limits[key] = value
elif obj_type == "AXISCORREXT":
# External axis correction limits
for param in params.findall("Parameter"):
name = param.attrib["Name"]
value = float(param.attrib["ParamValue"])
@ -52,7 +61,7 @@ def parse_rsi_limits(xml_path):
key = f"AKorr.E{axis}_{'min' if 'Lower' in name else 'max'}"
raw_limits[key] = value
# Build structured dictionary with only valid (min, max) pairs
# Combine _min and _max entries into structured tuples
structured_limits = {}
for key in list(raw_limits.keys()):
if key.endswith("_min"):

View File

@ -2,9 +2,19 @@ import logging
class SafetyManager:
"""
Enforces safety limits for RSI motion commands.
Supports:
- Emergency stop logic (halts all validation)
- Limit enforcement for RKorr / AKorr / other variables
- Runtime limit updates
"""
def __init__(self, limits=None):
"""
limits: Optional dictionary of variable limits in the form:
Args:
limits (dict): Optional safety limits in the form:
{
'RKorr.X': (-5.0, 5.0),
'AKorr.A1': (-6.0, 6.0),
@ -13,13 +23,22 @@ class SafetyManager:
"""
self.limits = limits if limits is not None else {}
self.e_stop = False
self.last_values = {}
self.last_values = {} # Reserved for future tracking or override detection
def validate(self, path: str, value: float) -> float:
"""
Validate a single variable update against safety rules.
Raises RuntimeError if E-Stop is active.
Raises ValueError if out of allowed bounds.
Validates an individual correction value.
Args:
path (str): Key of the variable, e.g., "RKorr.X"
value (float): Proposed value to apply
Returns:
float: Value if valid
Raises:
RuntimeError: If emergency stop is active
ValueError: If value exceeds allowed bounds
"""
if self.e_stop:
logging.warning(f"SafetyManager: {path} update blocked (E-STOP active)")
@ -34,21 +53,21 @@ class SafetyManager:
return value
def emergency_stop(self):
"""Activate emergency stop mode (blocks all motion)."""
"""Activates emergency stop: all motion validation will fail."""
self.e_stop = True
def reset_stop(self):
"""Reset emergency stop (motion allowed again)."""
"""Resets emergency stop, allowing motion again."""
self.e_stop = False
def set_limit(self, path: str, min_val: float, max_val: float):
"""Manually set or override a safety limit at runtime."""
"""Sets or overrides a safety limit at runtime."""
self.limits[path] = (min_val, max_val)
def get_limits(self):
"""Return a copy of current limits."""
"""Returns a copy of all current safety limits."""
return self.limits.copy()
def is_stopped(self):
"""Returns whether the emergency stop is active."""
return self.e_stop

View File

@ -1,19 +1,20 @@
from RSIPI.safety_manager import SafetyManager
import time
def generate_trajectory(start, end, steps=100, space="cartesian", mode="relative", include_resets=True):
"""
Generates a trajectory from start to end in N steps.
Generates a trajectory from start to end across N steps.
Args:
start (dict): starting pose/position
end (dict): ending pose/position
steps (int): number of points in the trajectory
space (str): "cartesian" or "joint" (for logging/use clarity)
mode (str): "relative" or "absolute"
include_resets (bool): if True, adds reset-to-zero between pulses (only for relative mode)
start (dict): Starting pose or joint values.
end (dict): Target pose or joint values.
steps (int): Number of interpolation steps.
space (str): "cartesian" or "joint" (for API dispatch and validation).
mode (str): "relative" or "absolute" movement type.
include_resets (bool): Inserts zero-correction resets between steps in relative mode.
Returns:
list[dict]: trajectory points
list[dict]: List of movement command dictionaries.
"""
if mode not in ["relative", "absolute"]:
raise ValueError("mode must be 'relative' or 'absolute'")
@ -23,7 +24,10 @@ def generate_trajectory(start, end, steps=100, space="cartesian", mode="relative
axes = start.keys()
trajectory = []
# Optional safety check hook — assumes SafetyManager has static validation methods
safety_fn = SafetyManager.check_cartesian_limits if space == "cartesian" else SafetyManager.check_joint_limits
global enforce_safety
enforce_safety = hasattr(SafetyManager, "check_cartesian_limits") # Enable only if those methods exist
for i in range(1, steps + 1):
point = {}
@ -31,25 +35,29 @@ def generate_trajectory(start, end, steps=100, space="cartesian", mode="relative
delta = end[axis] - start[axis]
value = start[axis] + (delta * i / steps)
if mode == "relative":
point[axis] = delta / steps
else:
point[axis] = value
point[axis] = delta / steps if mode == "relative" else value
# Optional safety enforcement
if enforce_safety and not safety_fn(point):
raise ValueError(f"⚠️ Safety check failed at step {i}: {point}")
trajectory.append(point)
if mode == "relative" and include_resets:
# Insert a zero-correction step to prevent drift
trajectory.append({axis: 0.0 for axis in axes})
return trajectory
def execute_trajectory(api, trajectory, space="cartesian", rate=0.004):
"""
Sends a trajectory by updating send variables at a fixed rate.
Use only with absolute motion or slow relative steps.
Sends a list of corrections to the RSI API at fixed intervals.
Args:
api: An RSI-compatible API object with update_cartesian / update_joints methods.
trajectory (list[dict]): Movement steps generated by generate_trajectory().
space (str): "cartesian" or "joint".
rate (float): Time between steps in seconds (default = 4ms).
"""
for point in trajectory:
if space == "cartesian":

View File

@ -1,16 +1,29 @@
import xml.etree.ElementTree as ET
class XMLGenerator:
"""Converts send and receive variables into properly formatted XML messages."""
"""
Converts structured dictionaries of RSI send/receive variables into
valid XML strings for UDP transmission to/from the robot controller.
"""
@staticmethod
def generate_send_xml(send_variables, network_settings):
"""Generate the send XML message dynamically based on send variables."""
"""
Build an outgoing XML message based on the current send variables.
Args:
send_variables (dict): Structured dictionary of values to send.
network_settings (dict): Contains 'sentype' used for the root element.
Returns:
str: XML-formatted string ready for UDP transmission.
"""
root = ET.Element("Sen", Type=network_settings["sentype"])
print("XML Send Variables : ", send_variables)
# Convert structured keys (e.g. RKorr, Tech) and flat elements (e.g. IPOC)
for key, value in send_variables.items():
if key == "FREE":
continue
continue # Skip unused placeholder fields
if isinstance(value, dict):
element = ET.SubElement(root, key)
@ -23,16 +36,23 @@ class XMLGenerator:
@staticmethod
def generate_receive_xml(receive_variables):
"""Generate the receive XML message dynamically based on receive variables."""
root = ET.Element("Rob", Type="KUKA") # ✅ Root with Type="KUKA"
"""
Build an incoming XML message for emulation/testing purposes.
Args:
receive_variables (dict): Structured dictionary of values to simulate reception.
Returns:
str: XML-formatted string mimicking a KUKA robot's reply.
"""
root = ET.Element("Rob", Type="KUKA")
for key, value in receive_variables.items():
if isinstance(value, dict) or hasattr(value, "items"): # ✅ Handle dictionaries as elements with attributes
if isinstance(value, dict) or hasattr(value, "items"):
element = ET.SubElement(root, key)
for sub_key, sub_value in value.items():
element.set(sub_key, f"{float(sub_value):.2f}")
else: # ✅ Handle standard elements with text values
else:
ET.SubElement(root, key).text = str(value)
return ET.tostring(root, encoding="utf-8").decode()