Compare commits

...

2 Commits

Author SHA1 Message Date
a81aef45b5 Mainly fixed a lot of cli command.
Fixed a lot of api commands,
Tidies up rsi client
added a lot of trajectory stuff
fixed some xml handling.
Some other stuff I cant remember.
2025-04-04 00:08:41 +01:00
663602cff0 Removed a load of debug prints 2025-04-03 11:34:12 +01:00
8 changed files with 244 additions and 128 deletions

View File

@ -1,41 +1,36 @@
from src.RSIPI.rsi_client import RSIClient import time
from time import sleep from RSIPI.rsi_api import RSIAPI
from RSIPI.trajectory_planner import generate_trajectory
from RSIPI.rsi_graphing import plot_csv_file
if __name__ == "__main__":
# config_file = "RSI_EthernetConfig.xml" # Ensure this file exists in the working directory
# client = RSIClient(config_file)
# client.start()
#
# # print("done")
# #
# # # client.stop()
# sleep(5)
# print("rdfsfsdfsfsdfjsjfhakjshfd")
# client.update_send_variable("EStr", "Testing 123 Testing")
# sleep(20)
# client.stop()
from src.RSIPI.rsi_api import RSIAPI
from time import sleep
api = RSIAPI() def main():
api = RSIAPI("RSI_EthernetConfig.xml")
print("⚙️ Starting RSI...")
api.start_rsi() api.start_rsi()
sleep(4)
# Dynamically update a variable
api.update_variable("EStr", "Tessting 123")
sleep(5) print("📦 Starting CSV log...")
api.start_logging() # Automatically creates a timestamped file
time.sleep(1.0) # Let handshake and logging settle
# ✅ Send a few small RKorr pulses
print("🔁 Sending RKorr pulses...")
for _ in range(5):
api.update_variable("RKorr.X", 0.5)
time.sleep(0.008) # 2 cycles
api.update_variable("RKorr.X", 0.0)
time.sleep(0.008)
print("🛑 Stopping CSV log...")
api.stop_logging()
print("🛑 Stopping RSI...")
api.stop_rsi() api.stop_rsi()
# from src.RSIPI.rsi_api import RSIAPI print("✅ Log complete. You can now visualise it with rsi_graphing or plot_csv_file().")
# from time import sleep
# def main():
# api = RSIAPI() if __name__ == "__main__":
# response = api.start_rsi() main()
# sleep(50)
# print(response)
#
#
#
# if __name__ == "__main__":
# main()

View File

@ -60,14 +60,10 @@ class NetworkProcess(multiprocessing.Process):
self.udp_socket.settimeout(5) self.udp_socket.settimeout(5)
data_received, self.controller_ip_and_port = self.udp_socket.recvfrom(1024) data_received, self.controller_ip_and_port = self.udp_socket.recvfrom(1024)
print("Receive: ", data_received) print("Receive: ", data_received)
print("HERE 1")
message = data_received.decode() message = data_received.decode()
print("HERE 2")
self.process_received_data(message) self.process_received_data(message)
print("HERE 3")
print("Network :", self.send_variables) print("Network :", self.send_variables)
send_xml = XMLGenerator.generate_send_xml(self.send_variables, self.config_parser.network_settings) send_xml = XMLGenerator.generate_send_xml(self.send_variables, self.config_parser.network_settings)
print("HERE 4")
print("Send:", send_xml) print("Send:", send_xml)
self.udp_socket.sendto(send_xml.encode(), self.controller_ip_and_port) self.udp_socket.sendto(send_xml.encode(), self.controller_ip_and_port)

View File

@ -1,4 +1,3 @@
import multiprocessing
import pandas as pd import pandas as pd
import numpy as np import numpy as np
import json import json
@ -8,19 +7,10 @@ from .rsi_graphing import RSIGraphing
from .kuka_visualizer import KukaRSIVisualizer from .kuka_visualizer import KukaRSIVisualizer
from .krl_to_csv_parser import KRLParser from .krl_to_csv_parser import KRLParser
from .inject_rsi_to_krl import inject_rsi_to_krl from .inject_rsi_to_krl import inject_rsi_to_krl
import threading # (Put this at the top of the file) import threading
from threading import Thread from threading import Thread
from .trajectory_planner import generate_trajectory, execute_trajectory from .trajectory_planner import generate_trajectory, execute_trajectory
import datetime
def compare_test_runs(file1, file2):
"""Compare two movement logs."""
df1 = pd.read_csv(file1)
df2 = pd.read_csv(file2)
diff = abs(df1 - df2)
max_deviation = diff.max()
return f"📊 Max Deviation: {max_deviation}"
class RSIAPI: class RSIAPI:
@ -28,6 +18,7 @@ class RSIAPI:
def __init__(self, config_file="RSI_EthernetConfig.xml"): def __init__(self, config_file="RSI_EthernetConfig.xml"):
"""Initialize RSIAPI with an RSI client instance.""" """Initialize RSIAPI with an RSI client instance."""
self.thread = None
self.client = RSIClient(config_file) self.client = RSIClient(config_file)
self.graph_process = None # Store graphing process self.graph_process = None # Store graphing process
self.graphing_instance = None self.graphing_instance = None
@ -57,6 +48,25 @@ class RSIAPI:
except Exception as e: except Exception as e:
return f"❌ Failed to update {variable}: {e}" return f"❌ Failed to update {variable}: {e}"
def show_variables(self, group: str = "all"):
"""
Prints current values of send/receive variable groups.
"""
import pprint
live_data = self.get_live_data()
if group == "all":
pprint.pprint(live_data)
else:
found = False
for key in live_data:
if key.lower() == group.lower():
pprint.pprint({key: live_data[key]})
found = True
break
if not found:
print(f"⚠️ Group '{group}' not found. Try one of: {', '.join(live_data.keys())}")
def get_variables(self): def get_variables(self):
"""Retrieve current send and receive variables.""" """Retrieve current send and receive variables."""
return { return {
@ -121,10 +131,6 @@ class RSIAPI:
self.client.update_send_variable(tech_param, float(value)) self.client.update_send_variable(tech_param, float(value))
return f"✅ Set {tech_param} to {value}" return f"✅ Set {tech_param} to {value}"
def override_safety(self, limit):
"""Override safety limits."""
return f"⚠️ Overriding safety limit: {limit}"
def reset_variables(self): def reset_variables(self):
"""Reset send variables to default values.""" """Reset send variables to default values."""
self.client.reset_send_variables() self.client.reset_send_variables()
@ -138,11 +144,13 @@ class RSIAPI:
"receive_variables": dict(self.client.receive_variables) "receive_variables": dict(self.client.receive_variables)
} }
# ✅ CSV LOGGING METHODS def start_logging(self, filename=None):
def start_logging(self, filename): if not filename:
"""Start logging RSI data to CSV.""" timestamp = datetime.datetime.now().strftime("%d-%m-%Y_%H-%M-%S")
filename = f"logs/{timestamp}.csv"
self.client.start_logging(filename) self.client.start_logging(filename)
return f"✅ CSV Logging started: {filename}" return filename
def stop_logging(self): def stop_logging(self):
"""Stop logging RSI data.""" """Stop logging RSI data."""
@ -153,7 +161,6 @@ class RSIAPI:
"""Return logging status.""" """Return logging status."""
return self.client.is_logging_active() return self.client.is_logging_active()
# ✅ GRAPHING METHODS
def start_graphing(self, mode="position", overlay=False, plan_file=None): def start_graphing(self, mode="position", overlay=False, plan_file=None):
if self.graph_thread and self.graph_thread.is_alive(): if self.graph_thread and self.graph_thread.is_alive():
return "⚠️ Graphing is already running." return "⚠️ Graphing is already running."
@ -177,6 +184,12 @@ class RSIAPI:
self.client.enable_alerts(enable) self.client.enable_alerts(enable)
return f"✅ Alerts {'enabled' if enable else 'disabled'}." return f"✅ Alerts {'enabled' if enable else 'disabled'}."
def override_safety(self, enabled: bool):
self.client.safety_manager.override_safety(enabled)
def is_safety_overridden(self) -> bool:
return self.client.safety_manager.is_safety_overridden()
def set_alert_threshold(self, alert_type, value): def set_alert_threshold(self, alert_type, value):
"""Set threshold for deviation or force alerts.""" """Set threshold for deviation or force alerts."""
if alert_type in ["deviation", "force"]: if alert_type in ["deviation", "force"]:
@ -359,3 +372,40 @@ class RSIAPI:
f.write("\n".join(report_lines)) f.write("\n".join(report_lines))
return f"✅ Report generated: {output}" return f"✅ Report generated: {output}"
def update_cartesian(self, **kwargs):
"""
Update Cartesian correction values (RKorr).
Example: update_cartesian(X=10.0, Y=0.0, Z=0.0)
"""
for axis, value in kwargs.items():
self.update_variable(f"RKorr.{axis}", float(value))
def update_joints(self, **kwargs):
for axis, value in kwargs.items():
self.update_variable(f"AKorr.{axis}", float(value))
def watch_network(self, duration: float = None, rate: float = 0.2):
"""
Continuously prints current receive variables (and IPOC).
If duration is None, runs until interrupted.
"""
import time
import datetime
print("📡 Watching network... Press Ctrl+C to stop.\n")
start_time = time.time()
try:
while True:
live_data = self.get_live_data()
ipoc = live_data.get("IPOC", "N/A")
rpos = live_data.get("RIst", {})
print(f"[{datetime.datetime.now().strftime('%H:%M:%S')}] IPOC: {ipoc} | RIst: {rpos}")
time.sleep(rate)
if duration and (time.time() - start_time) >= duration:
break
except KeyboardInterrupt:
print("\n🛑 Stopped network watch.")

View File

@ -1,4 +1,4 @@
from .rsi_client import RSIClient from .rsi_api import RSIAPI
from .kuka_visualizer import KukaRSIVisualizer from .kuka_visualizer import KukaRSIVisualizer
from .krl_to_csv_parser import KRLParser from .krl_to_csv_parser import KRLParser
from .inject_rsi_to_krl import inject_rsi_to_krl from .inject_rsi_to_krl import inject_rsi_to_krl
@ -6,12 +6,10 @@ from .inject_rsi_to_krl import inject_rsi_to_krl
class RSICommandLineInterface: class RSICommandLineInterface:
"""Command-Line Interface for controlling RSI Client.""" """Command-Line Interface for controlling RSI Client."""
def __init__(self, config_file): def __init__(self, input_config_file):
"""Initialize CLI with an RSI API instance.""" """Initialize CLI with an RSI API instance."""
self.client = RSIClient(config_file) self.client = RSIAPI(input_config_file)
self.running = True self.running = True
self.rsi_thread = None # Store RSI thread
self.graph_process = None # Store graphing process
def run(self): def run(self):
"""Starts the CLI interaction loop.""" """Starts the CLI interaction loop."""
@ -29,42 +27,77 @@ class RSICommandLineInterface:
cmd = parts[0] cmd = parts[0]
if cmd == "start": if cmd == "start":
self.start_rsi() self.client.start_rsi()
elif cmd == "stop": elif cmd == "stop":
self.stop_rsi() self.client.stop_rsi()
elif cmd == "set" and len(parts) >= 3: elif cmd == "set" and len(parts) >= 3:
variable, value = parts[1], " ".join(parts[2:]) variable, value = parts[1], " ".join(parts[2:])
self.update_variable(variable, value) self.client.update_variable(variable, value)
elif cmd == "alerts" and len(parts) == 2: elif cmd == "alerts" and len(parts) == 2:
self.toggle_alerts(parts[1]) self.toggle_alerts(parts[1])
elif cmd == "set_alert_threshold" and len(parts) == 3: elif cmd == "set_alert_threshold" and len(parts) == 3:
self.set_alert_threshold(parts[1], parts[2]) self.set_alert_threshold(parts[1], parts[2])
elif cmd == "show": elif cmd == "show":
self.show_variables() if len(parts) == 0:
group = "all"
else:
group = parts[0]
self.client.show_variables(group)
elif cmd == "ipoc": elif cmd == "ipoc":
self.show_ipoc() self.client.get_ipoc()
elif cmd == "watch": elif cmd == "watch":
self.watch_network() duration = float(parts[0]) if parts else None
self.client.watch_network(duration=duration)
elif cmd == "reset": elif cmd == "reset":
self.reset_variables() self.client.reset_variables()
elif cmd == "status": elif cmd == "status":
self.show_status() self.client.get_status()
elif cmd == "reconnect": elif cmd == "reconnect":
self.reconnect() self.client.reconnect()
elif cmd == "toggle" and len(parts) == 3: elif cmd == "toggle" and len(parts) == 3:
self.toggle_digital_io(parts[1], parts[2]) self.client.toggle_digital_io(parts[1], parts[2])
elif cmd == "move_external" and len(parts) == 3: elif cmd == "move_external" and len(parts) == 3:
self.move_external_axis(parts[1], parts[2]) self.client.move_external_axis(parts[1], parts[2])
elif cmd == "correct" and len(parts) == 4: elif cmd == "correct" and len(parts) == 4:
self.correct_position(parts[1], parts[2], parts[3]) self.client.correct_position(parts[1], parts[2], parts[3])
elif cmd == "speed" and len(parts) == 3: elif cmd == "speed" and len(parts) == 3:
self.adjust_speed(parts[1], parts[2]) self.client.adjust_speed(parts[1], parts[2])
elif cmd == "override" and len(parts) == 2: elif cmd == "override" and len(parts) == 2:
self.override_safety(parts[1]) self.client.override_safety(parts[1])
elif cmd == "log" and len(parts) >= 2: elif cmd == "log":
self.handle_logging_command(parts) if len(parts) < 1:
elif cmd == "graph" and len(parts) >= 2: print("⚠️ Usage: log start|stop|status")
self.handle_graphing_command(parts) return
subcmd = parts[0].lower()
if subcmd == "start":
filename = self.client.start_logging()
print(f"✅ Logging started → {filename}")
elif subcmd == "stop":
self.client.stop_logging()
print("🛑 Logging stopped.")
elif subcmd == "status":
status = self.client.is_logging_active()
print("📊 Logging is currently", "ACTIVE ✅" if status else "INACTIVE ❌")
else:
print("⚠️ Unknown log subcommand. Use: start, stop, status")
elif cmd == "graph":
if len(parts) == 0:
print("⚠️ Usage: graph show <file> | graph compare <file1> <file2>")
return
sub = parts[0].lower()
if sub == "show" and len(parts) == 2:
self.client.visualize_csv_log(parts[1])
elif sub == "compare" and len(parts) == 3:
self.client.compare_test_runs(parts[1], parts[2])
else:
print("⚠️ Usage:\n graph show <file>\n graph compare <file1> <file2>")
elif cmd == "export" and len(parts) == 2: elif cmd == "export" and len(parts) == 2:
self.export_data(parts[1]) self.export_data(parts[1])
elif cmd == "compare" and len(parts) == 3: elif cmd == "compare" and len(parts) == 3:
@ -72,7 +105,7 @@ class RSICommandLineInterface:
elif cmd == "report" and len(parts) >= 3: elif cmd == "report" and len(parts) >= 3:
self.generate_report(parts[1], parts[2]) self.generate_report(parts[1], parts[2])
elif cmd == "exit": elif cmd == "exit":
self.stop_rsi() self.client.stop_rsi()
self.running = False self.running = False
elif cmd == "help": elif cmd == "help":
self.show_help() self.show_help()
@ -108,7 +141,7 @@ class RSICommandLineInterface:
start_dict = self.parse_pose_string(parts[1]) start_dict = self.parse_pose_string(parts[1])
end_dict = self.parse_pose_string(parts[2]) end_dict = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", default=50, cast_type=int) steps = self.extract_optional_value(parts, "steps", default=50, cast_type=int)
rate = self.extract_optional_value(parts, "rate", default=0.012, cast_type=float) rate = self.extract_optional_value(parts, "rate", default=0.04, cast_type=float)
trajectory = self.client.generate_trajectory(start_dict, end_dict, steps=steps, space="cartesian") trajectory = self.client.generate_trajectory(start_dict, end_dict, steps=steps, space="cartesian")
self.client.execute_trajectory(trajectory, space="cartesian", rate=rate) self.client.execute_trajectory(trajectory, space="cartesian", rate=rate)
@ -117,7 +150,7 @@ class RSICommandLineInterface:
start_dict = self.parse_pose_string(parts[1]) start_dict = self.parse_pose_string(parts[1])
end_dict = self.parse_pose_string(parts[2]) end_dict = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", default=50, cast_type=int) steps = self.extract_optional_value(parts, "steps", default=50, cast_type=int)
rate = self.extract_optional_value(parts, "rate", default=0.012, cast_type=float) rate = self.extract_optional_value(parts, "rate", default=0.4, cast_type=float)
trajectory = self.client.generate_trajectory(start_dict, end_dict, steps=steps, space="joint") trajectory = self.client.generate_trajectory(start_dict, end_dict, steps=steps, space="joint")
self.client.execute_trajectory(trajectory, space="joint", rate=rate) self.client.execute_trajectory(trajectory, space="joint", rate=rate)
@ -125,7 +158,7 @@ class RSICommandLineInterface:
start = self.parse_pose_string(parts[1]) start = self.parse_pose_string(parts[1])
end = self.parse_pose_string(parts[2]) end = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", 50, int) steps = self.extract_optional_value(parts, "steps", 50, int)
rate = self.extract_optional_value(parts, "rate", 0.012, float) rate = self.extract_optional_value(parts, "rate", 0.04, float)
traj = self.client.generate_trajectory(start, end, steps, "cartesian") traj = self.client.generate_trajectory(start, end, steps, "cartesian")
self.client.queue_trajectory(traj, "cartesian", rate) self.client.queue_trajectory(traj, "cartesian", rate)
@ -133,7 +166,7 @@ class RSICommandLineInterface:
start = self.parse_pose_string(parts[1]) start = self.parse_pose_string(parts[1])
end = self.parse_pose_string(parts[2]) end = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", 50, int) steps = self.extract_optional_value(parts, "steps", 50, int)
rate = self.extract_optional_value(parts, "rate", 0.012, float) rate = self.extract_optional_value(parts, "rate", 0.04, float)
traj = self.client.generate_trajectory(start, end, steps, "joint") traj = self.client.generate_trajectory(start, end, steps, "joint")
self.client.queue_trajectory(traj, "joint", rate) self.client.queue_trajectory(traj, "joint", rate)

View File

@ -70,20 +70,16 @@ class RSIClient:
print(f"[DEBUG] update_send_variable called with: {name} = {value}") print(f"[DEBUG] update_send_variable called with: {name} = {value}")
if "." in name: if "." in name:
parent, child = name.split(".", 1) parent, child = name.split(".", 1)
if parent in self.send_variables and isinstance(self.send_variables[parent], dict): if parent in self.send_variables:
self.send_variables[parent][child] = value current = dict(self.send_variables[parent]) # copy inner dict
print(f"[DEBUG] Current send_variables: {dict(self.send_variables)}") current[child] = float(value)
self.send_variables[parent] = current # reassign to trigger proxy update
return f"✅ Updated {name} to {value}" return f"✅ Updated {name} to {value}"
else: else:
return f"Variable '{name}' not found or not structured correctly" return f"Parent variable '{parent}' not found in send_variables"
else: else:
if name in self.send_variables: self.send_variables[name] = float(value)
self.send_variables[name] = value return f"✅ Updated {name} to {value}"
return f"✅ Updated {name} to {value}"
print(f"[DEBUG] EStr value after update: {self.send_variables.get('EStr')}")
print(f"[DEBUG] Current send_variables: {dict(self.send_variables)}")
else:
return f"❌ Variable '{name}' not found in send_variables"
def start_logging(self, filename): def start_logging(self, filename):
if hasattr(self.network_process, "start_logging"): if hasattr(self.network_process, "start_logging"):
@ -93,6 +89,9 @@ class RSIClient:
if hasattr(self.network_process, "stop_logging"): if hasattr(self.network_process, "stop_logging"):
self.network_process.stop_logging() self.network_process.stop_logging()
def is_logging_active(self):
return self.network_process.is_logging_active()
def enable_alerts(self, enable): def enable_alerts(self, enable):
if hasattr(self.network_process, "enable_alerts"): if hasattr(self.network_process, "enable_alerts"):
self.network_process.enable_alerts(enable) self.network_process.enable_alerts(enable)

View File

@ -3,6 +3,7 @@ import matplotlib.pyplot as plt
import matplotlib.animation as animation import matplotlib.animation as animation
from collections import deque from collections import deque
from .rsi_client import RSIClient from .rsi_client import RSIClient
import csv
class RSIGraphing: class RSIGraphing:
"""Handles real-time and CSV-based graphing for RSI analysis with alerts and threshold monitoring.""" """Handles real-time and CSV-based graphing for RSI analysis with alerts and threshold monitoring."""
@ -126,6 +127,33 @@ class RSIGraphing:
"""Stop live plotting loop by closing the figure.""" """Stop live plotting loop by closing the figure."""
plt.close(self.fig) plt.close(self.fig)
def plot_csv_file(csv_path):
"""Plot X/Y/Z position over time from a CSV log file."""
import matplotlib.pyplot as plt
timestamps = []
x_data, y_data, z_data = [], [], []
with open(csv_path, newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
timestamps.append(row["Timestamp"])
x_data.append(float(row.get("Receive.RIst.X", 0.0)))
y_data.append(float(row.get("Receive.RIst.Y", 0.0)))
z_data.append(float(row.get("Receive.RIst.Z", 0.0)))
plt.figure(figsize=(10, 6))
plt.plot(timestamps, x_data, label="X")
plt.plot(timestamps, y_data, label="Y")
plt.plot(timestamps, z_data, label="Z")
plt.title("Position from CSV Log")
plt.xlabel("Time")
plt.ylabel("Position (mm)")
plt.xticks(rotation=45)
plt.legend()
plt.tight_layout()
plt.show()
if __name__ == "__main__": if __name__ == "__main__":
import argparse import argparse

View File

@ -1,41 +1,61 @@
import numpy as np from RSIPI.safety_manager import SafetyManager
def generate_trajectory(start, end, steps=100, space="cartesian", mode="relative", include_resets=True):
def generate_trajectory(start, end, steps=100, space="cartesian"):
""" """
Generates a linear trajectory from start to end over `steps`. Generates a trajectory from start to end in N steps.
Supported spaces: 'cartesian', 'joint'
Args:
start (dict): starting pose/position
end (dict): ending pose/position
steps (int): number of points in the trajectory
space (str): "cartesian" or "joint" (for logging/use clarity)
mode (str): "relative" or "absolute"
include_resets (bool): if True, adds reset-to-zero between pulses (only for relative mode)
Returns:
list[dict]: trajectory points
""" """
if not isinstance(start, dict) or not isinstance(end, dict): if mode not in ["relative", "absolute"]:
raise ValueError("Start and end must be dictionaries.") raise ValueError("mode must be 'relative' or 'absolute'")
if space not in ["cartesian", "joint"]:
raise ValueError("space must be 'cartesian' or 'joint'")
keys = sorted(start.keys()) axes = start.keys()
if keys != sorted(end.keys()): trajectory = []
raise ValueError("Start and end must have matching keys.")
traj = [] safety_fn = SafetyManager.check_cartesian_limits if space == "cartesian" else SafetyManager.check_joint_limits
for i in range(steps):
for i in range(1, steps + 1):
point = {} point = {}
for key in keys: for axis in axes:
point[key] = float(np.linspace(start[key], end[key], steps)[i]) delta = end[axis] - start[axis]
traj.append(point) value = start[axis] + (delta * i / steps)
return traj if mode == "relative":
point[axis] = delta / steps
else:
point[axis] = value
if enforce_safety and not safety_fn(point):
raise ValueError(f"⚠️ Safety check failed at step {i}: {point}")
def execute_trajectory(api, trajectory, space="cartesian", rate=0.012): trajectory.append(point)
if mode == "relative" and include_resets:
trajectory.append({axis: 0.0 for axis in axes})
return trajectory
def execute_trajectory(api, trajectory, space="cartesian", rate=0.004):
""" """
Sends the trajectory to the robot using the RSIPI API. Sends a trajectory by updating send variables at a fixed rate.
- space: 'cartesian' or 'joint' Use only with absolute motion or slow relative steps.
- rate: time between points in seconds (e.g., 0.012 for 12 ms)
""" """
import time
for point in trajectory: for point in trajectory:
if space == "cartesian": if space == "cartesian":
api.update_cartesian(**point) api.update_cartesian(**point)
elif space == "joint": elif space == "joint":
api.update_joints(**point) api.update_joints(**point)
else: else:
raise ValueError("Space must be 'cartesian' or 'joint'") raise ValueError("space must be 'cartesian' or 'joint'")
time.sleep(rate) time.sleep(rate)

View File

@ -3,13 +3,11 @@ import xml.etree.ElementTree as ET
class XMLGenerator: class XMLGenerator:
"""Converts send and receive variables into properly formatted XML messages.""" """Converts send and receive variables into properly formatted XML messages."""
@staticmethod
@staticmethod @staticmethod
def generate_send_xml(send_variables, network_settings): def generate_send_xml(send_variables, network_settings):
"""Generate the send XML message dynamically based on send variables.""" """Generate the send XML message dynamically based on send variables."""
root = ET.Element("Sen", Type=network_settings["sentype"]) root = ET.Element("Sen", Type=network_settings["sentype"])
print("XML 1") print("XML Send Variables : ", send_variables)
print("XML : ", send_variables)
for key, value in send_variables.items(): for key, value in send_variables.items():
if key == "FREE": if key == "FREE":
continue continue
@ -21,9 +19,6 @@ class XMLGenerator:
else: else:
ET.SubElement(root, key).text = str(value) ET.SubElement(root, key).text = str(value)
if key == "EStr":
print(f"[DEBUG] XMLGenerator sees EStr: {value}")
print("XML 2")
return ET.tostring(root, encoding="utf-8").decode() return ET.tostring(root, encoding="utf-8").decode()
@staticmethod @staticmethod
@ -32,7 +27,7 @@ class XMLGenerator:
root = ET.Element("Rob", Type="KUKA") # ✅ Root with Type="KUKA" root = ET.Element("Rob", Type="KUKA") # ✅ Root with Type="KUKA"
for key, value in receive_variables.items(): for key, value in receive_variables.items():
if isinstance(value, dict): # ✅ Handle dictionaries as elements with attributes if isinstance(value, dict) or hasattr(value, "items"): # ✅ Handle dictionaries as elements with attributes
element = ET.SubElement(root, key) element = ET.SubElement(root, key)
for sub_key, sub_value in value.items(): for sub_key, sub_value in value.items():
element.set(sub_key, f"{float(sub_value):.2f}") element.set(sub_key, f"{float(sub_value):.2f}")