Compare commits

..

No commits in common. "a81aef45b5815d8937952f01ee482404690fdde6" and "af090b507b7c63a3a39150549d3ed8faf4358bfc" have entirely different histories.

8 changed files with 133 additions and 249 deletions

View File

@ -1,36 +1,41 @@
import time
from RSIPI.rsi_api import RSIAPI
from RSIPI.trajectory_planner import generate_trajectory
from RSIPI.rsi_graphing import plot_csv_file
def main():
api = RSIAPI("RSI_EthernetConfig.xml")
print("⚙️ Starting RSI...")
api.start_rsi()
print("📦 Starting CSV log...")
api.start_logging() # Automatically creates a timestamped file
time.sleep(1.0) # Let handshake and logging settle
# ✅ Send a few small RKorr pulses
print("🔁 Sending RKorr pulses...")
for _ in range(5):
api.update_variable("RKorr.X", 0.5)
time.sleep(0.008) # 2 cycles
api.update_variable("RKorr.X", 0.0)
time.sleep(0.008)
print("🛑 Stopping CSV log...")
api.stop_logging()
print("🛑 Stopping RSI...")
api.stop_rsi()
print("✅ Log complete. You can now visualise it with rsi_graphing or plot_csv_file().")
from src.RSIPI.rsi_client import RSIClient
from time import sleep
if __name__ == "__main__":
main()
# config_file = "RSI_EthernetConfig.xml" # Ensure this file exists in the working directory
# client = RSIClient(config_file)
# client.start()
#
# # print("done")
# #
# # # client.stop()
# sleep(5)
# print("rdfsfsdfsfsdfjsjfhakjshfd")
# client.update_send_variable("EStr", "Testing 123 Testing")
# sleep(20)
# client.stop()
from src.RSIPI.rsi_api import RSIAPI
from time import sleep
api = RSIAPI()
api.start_rsi()
sleep(4)
# Dynamically update a variable
api.update_variable("EStr", "Tessting 123")
sleep(5)
api.stop_rsi()
# from src.RSIPI.rsi_api import RSIAPI
# from time import sleep
# def main():
# api = RSIAPI()
# response = api.start_rsi()
# sleep(50)
# print(response)
#
#
#
# if __name__ == "__main__":
# main()

View File

@ -60,10 +60,14 @@ class NetworkProcess(multiprocessing.Process):
self.udp_socket.settimeout(5)
data_received, self.controller_ip_and_port = self.udp_socket.recvfrom(1024)
print("Receive: ", data_received)
print("HERE 1")
message = data_received.decode()
print("HERE 2")
self.process_received_data(message)
print("HERE 3")
print("Network :", self.send_variables)
send_xml = XMLGenerator.generate_send_xml(self.send_variables, self.config_parser.network_settings)
print("HERE 4")
print("Send:", send_xml)
self.udp_socket.sendto(send_xml.encode(), self.controller_ip_and_port)

View File

@ -1,3 +1,4 @@
import multiprocessing
import pandas as pd
import numpy as np
import json
@ -7,10 +8,19 @@ from .rsi_graphing import RSIGraphing
from .kuka_visualizer import KukaRSIVisualizer
from .krl_to_csv_parser import KRLParser
from .inject_rsi_to_krl import inject_rsi_to_krl
import threading
import threading # (Put this at the top of the file)
from threading import Thread
from .trajectory_planner import generate_trajectory, execute_trajectory
import datetime
def compare_test_runs(file1, file2):
"""Compare two movement logs."""
df1 = pd.read_csv(file1)
df2 = pd.read_csv(file2)
diff = abs(df1 - df2)
max_deviation = diff.max()
return f"📊 Max Deviation: {max_deviation}"
class RSIAPI:
@ -18,7 +28,6 @@ class RSIAPI:
def __init__(self, config_file="RSI_EthernetConfig.xml"):
"""Initialize RSIAPI with an RSI client instance."""
self.thread = None
self.client = RSIClient(config_file)
self.graph_process = None # Store graphing process
self.graphing_instance = None
@ -48,25 +57,6 @@ class RSIAPI:
except Exception as e:
return f"❌ Failed to update {variable}: {e}"
def show_variables(self, group: str = "all"):
"""
Prints current values of send/receive variable groups.
"""
import pprint
live_data = self.get_live_data()
if group == "all":
pprint.pprint(live_data)
else:
found = False
for key in live_data:
if key.lower() == group.lower():
pprint.pprint({key: live_data[key]})
found = True
break
if not found:
print(f"⚠️ Group '{group}' not found. Try one of: {', '.join(live_data.keys())}")
def get_variables(self):
"""Retrieve current send and receive variables."""
return {
@ -131,6 +121,10 @@ class RSIAPI:
self.client.update_send_variable(tech_param, float(value))
return f"✅ Set {tech_param} to {value}"
def override_safety(self, limit):
"""Override safety limits."""
return f"⚠️ Overriding safety limit: {limit}"
def reset_variables(self):
"""Reset send variables to default values."""
self.client.reset_send_variables()
@ -144,13 +138,11 @@ class RSIAPI:
"receive_variables": dict(self.client.receive_variables)
}
def start_logging(self, filename=None):
if not filename:
timestamp = datetime.datetime.now().strftime("%d-%m-%Y_%H-%M-%S")
filename = f"logs/{timestamp}.csv"
# ✅ CSV LOGGING METHODS
def start_logging(self, filename):
"""Start logging RSI data to CSV."""
self.client.start_logging(filename)
return filename
return f"✅ CSV Logging started: {filename}"
def stop_logging(self):
"""Stop logging RSI data."""
@ -161,6 +153,7 @@ class RSIAPI:
"""Return logging status."""
return self.client.is_logging_active()
# ✅ GRAPHING METHODS
def start_graphing(self, mode="position", overlay=False, plan_file=None):
if self.graph_thread and self.graph_thread.is_alive():
return "⚠️ Graphing is already running."
@ -184,12 +177,6 @@ class RSIAPI:
self.client.enable_alerts(enable)
return f"✅ Alerts {'enabled' if enable else 'disabled'}."
def override_safety(self, enabled: bool):
self.client.safety_manager.override_safety(enabled)
def is_safety_overridden(self) -> bool:
return self.client.safety_manager.is_safety_overridden()
def set_alert_threshold(self, alert_type, value):
"""Set threshold for deviation or force alerts."""
if alert_type in ["deviation", "force"]:
@ -372,40 +359,3 @@ class RSIAPI:
f.write("\n".join(report_lines))
return f"✅ Report generated: {output}"
def update_cartesian(self, **kwargs):
"""
Update Cartesian correction values (RKorr).
Example: update_cartesian(X=10.0, Y=0.0, Z=0.0)
"""
for axis, value in kwargs.items():
self.update_variable(f"RKorr.{axis}", float(value))
def update_joints(self, **kwargs):
for axis, value in kwargs.items():
self.update_variable(f"AKorr.{axis}", float(value))
def watch_network(self, duration: float = None, rate: float = 0.2):
"""
Continuously prints current receive variables (and IPOC).
If duration is None, runs until interrupted.
"""
import time
import datetime
print("📡 Watching network... Press Ctrl+C to stop.\n")
start_time = time.time()
try:
while True:
live_data = self.get_live_data()
ipoc = live_data.get("IPOC", "N/A")
rpos = live_data.get("RIst", {})
print(f"[{datetime.datetime.now().strftime('%H:%M:%S')}] IPOC: {ipoc} | RIst: {rpos}")
time.sleep(rate)
if duration and (time.time() - start_time) >= duration:
break
except KeyboardInterrupt:
print("\n🛑 Stopped network watch.")

View File

@ -1,4 +1,4 @@
from .rsi_api import RSIAPI
from .rsi_client import RSIClient
from .kuka_visualizer import KukaRSIVisualizer
from .krl_to_csv_parser import KRLParser
from .inject_rsi_to_krl import inject_rsi_to_krl
@ -6,10 +6,12 @@ from .inject_rsi_to_krl import inject_rsi_to_krl
class RSICommandLineInterface:
"""Command-Line Interface for controlling RSI Client."""
def __init__(self, input_config_file):
def __init__(self, config_file):
"""Initialize CLI with an RSI API instance."""
self.client = RSIAPI(input_config_file)
self.client = RSIClient(config_file)
self.running = True
self.rsi_thread = None # Store RSI thread
self.graph_process = None # Store graphing process
def run(self):
"""Starts the CLI interaction loop."""
@ -27,77 +29,42 @@ class RSICommandLineInterface:
cmd = parts[0]
if cmd == "start":
self.client.start_rsi()
self.start_rsi()
elif cmd == "stop":
self.client.stop_rsi()
self.stop_rsi()
elif cmd == "set" and len(parts) >= 3:
variable, value = parts[1], " ".join(parts[2:])
self.client.update_variable(variable, value)
self.update_variable(variable, value)
elif cmd == "alerts" and len(parts) == 2:
self.toggle_alerts(parts[1])
elif cmd == "set_alert_threshold" and len(parts) == 3:
self.set_alert_threshold(parts[1], parts[2])
elif cmd == "show":
if len(parts) == 0:
group = "all"
else:
group = parts[0]
self.client.show_variables(group)
self.show_variables()
elif cmd == "ipoc":
self.client.get_ipoc()
self.show_ipoc()
elif cmd == "watch":
duration = float(parts[0]) if parts else None
self.client.watch_network(duration=duration)
self.watch_network()
elif cmd == "reset":
self.client.reset_variables()
self.reset_variables()
elif cmd == "status":
self.client.get_status()
self.show_status()
elif cmd == "reconnect":
self.client.reconnect()
self.reconnect()
elif cmd == "toggle" and len(parts) == 3:
self.client.toggle_digital_io(parts[1], parts[2])
self.toggle_digital_io(parts[1], parts[2])
elif cmd == "move_external" and len(parts) == 3:
self.client.move_external_axis(parts[1], parts[2])
self.move_external_axis(parts[1], parts[2])
elif cmd == "correct" and len(parts) == 4:
self.client.correct_position(parts[1], parts[2], parts[3])
self.correct_position(parts[1], parts[2], parts[3])
elif cmd == "speed" and len(parts) == 3:
self.client.adjust_speed(parts[1], parts[2])
self.adjust_speed(parts[1], parts[2])
elif cmd == "override" and len(parts) == 2:
self.client.override_safety(parts[1])
elif cmd == "log":
if len(parts) < 1:
print("⚠️ Usage: log start|stop|status")
return
subcmd = parts[0].lower()
if subcmd == "start":
filename = self.client.start_logging()
print(f"✅ Logging started → {filename}")
elif subcmd == "stop":
self.client.stop_logging()
print("🛑 Logging stopped.")
elif subcmd == "status":
status = self.client.is_logging_active()
print("📊 Logging is currently", "ACTIVE ✅" if status else "INACTIVE ❌")
else:
print("⚠️ Unknown log subcommand. Use: start, stop, status")
elif cmd == "graph":
if len(parts) == 0:
print("⚠️ Usage: graph show <file> | graph compare <file1> <file2>")
return
sub = parts[0].lower()
if sub == "show" and len(parts) == 2:
self.client.visualize_csv_log(parts[1])
elif sub == "compare" and len(parts) == 3:
self.client.compare_test_runs(parts[1], parts[2])
else:
print("⚠️ Usage:\n graph show <file>\n graph compare <file1> <file2>")
self.override_safety(parts[1])
elif cmd == "log" and len(parts) >= 2:
self.handle_logging_command(parts)
elif cmd == "graph" and len(parts) >= 2:
self.handle_graphing_command(parts)
elif cmd == "export" and len(parts) == 2:
self.export_data(parts[1])
elif cmd == "compare" and len(parts) == 3:
@ -105,7 +72,7 @@ class RSICommandLineInterface:
elif cmd == "report" and len(parts) >= 3:
self.generate_report(parts[1], parts[2])
elif cmd == "exit":
self.client.stop_rsi()
self.stop_rsi()
self.running = False
elif cmd == "help":
self.show_help()
@ -141,7 +108,7 @@ class RSICommandLineInterface:
start_dict = self.parse_pose_string(parts[1])
end_dict = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", default=50, cast_type=int)
rate = self.extract_optional_value(parts, "rate", default=0.04, cast_type=float)
rate = self.extract_optional_value(parts, "rate", default=0.012, cast_type=float)
trajectory = self.client.generate_trajectory(start_dict, end_dict, steps=steps, space="cartesian")
self.client.execute_trajectory(trajectory, space="cartesian", rate=rate)
@ -150,7 +117,7 @@ class RSICommandLineInterface:
start_dict = self.parse_pose_string(parts[1])
end_dict = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", default=50, cast_type=int)
rate = self.extract_optional_value(parts, "rate", default=0.4, cast_type=float)
rate = self.extract_optional_value(parts, "rate", default=0.012, cast_type=float)
trajectory = self.client.generate_trajectory(start_dict, end_dict, steps=steps, space="joint")
self.client.execute_trajectory(trajectory, space="joint", rate=rate)
@ -158,7 +125,7 @@ class RSICommandLineInterface:
start = self.parse_pose_string(parts[1])
end = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", 50, int)
rate = self.extract_optional_value(parts, "rate", 0.04, float)
rate = self.extract_optional_value(parts, "rate", 0.012, float)
traj = self.client.generate_trajectory(start, end, steps, "cartesian")
self.client.queue_trajectory(traj, "cartesian", rate)
@ -166,7 +133,7 @@ class RSICommandLineInterface:
start = self.parse_pose_string(parts[1])
end = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", 50, int)
rate = self.extract_optional_value(parts, "rate", 0.04, float)
rate = self.extract_optional_value(parts, "rate", 0.012, float)
traj = self.client.generate_trajectory(start, end, steps, "joint")
self.client.queue_trajectory(traj, "joint", rate)

View File

@ -70,16 +70,20 @@ class RSIClient:
print(f"[DEBUG] update_send_variable called with: {name} = {value}")
if "." in name:
parent, child = name.split(".", 1)
if parent in self.send_variables:
current = dict(self.send_variables[parent]) # copy inner dict
current[child] = float(value)
self.send_variables[parent] = current # reassign to trigger proxy update
if parent in self.send_variables and isinstance(self.send_variables[parent], dict):
self.send_variables[parent][child] = value
print(f"[DEBUG] Current send_variables: {dict(self.send_variables)}")
return f"✅ Updated {name} to {value}"
else:
return f"Parent variable '{parent}' not found in send_variables"
return f"Variable '{name}' not found or not structured correctly"
else:
self.send_variables[name] = float(value)
return f"✅ Updated {name} to {value}"
if name in self.send_variables:
self.send_variables[name] = value
return f"✅ Updated {name} to {value}"
print(f"[DEBUG] EStr value after update: {self.send_variables.get('EStr')}")
print(f"[DEBUG] Current send_variables: {dict(self.send_variables)}")
else:
return f"❌ Variable '{name}' not found in send_variables"
def start_logging(self, filename):
if hasattr(self.network_process, "start_logging"):
@ -89,9 +93,6 @@ class RSIClient:
if hasattr(self.network_process, "stop_logging"):
self.network_process.stop_logging()
def is_logging_active(self):
return self.network_process.is_logging_active()
def enable_alerts(self, enable):
if hasattr(self.network_process, "enable_alerts"):
self.network_process.enable_alerts(enable)

View File

@ -3,7 +3,6 @@ import matplotlib.pyplot as plt
import matplotlib.animation as animation
from collections import deque
from .rsi_client import RSIClient
import csv
class RSIGraphing:
"""Handles real-time and CSV-based graphing for RSI analysis with alerts and threshold monitoring."""
@ -127,33 +126,6 @@ class RSIGraphing:
"""Stop live plotting loop by closing the figure."""
plt.close(self.fig)
def plot_csv_file(csv_path):
"""Plot X/Y/Z position over time from a CSV log file."""
import matplotlib.pyplot as plt
timestamps = []
x_data, y_data, z_data = [], [], []
with open(csv_path, newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
timestamps.append(row["Timestamp"])
x_data.append(float(row.get("Receive.RIst.X", 0.0)))
y_data.append(float(row.get("Receive.RIst.Y", 0.0)))
z_data.append(float(row.get("Receive.RIst.Z", 0.0)))
plt.figure(figsize=(10, 6))
plt.plot(timestamps, x_data, label="X")
plt.plot(timestamps, y_data, label="Y")
plt.plot(timestamps, z_data, label="Z")
plt.title("Position from CSV Log")
plt.xlabel("Time")
plt.ylabel("Position (mm)")
plt.xticks(rotation=45)
plt.legend()
plt.tight_layout()
plt.show()
if __name__ == "__main__":
import argparse

View File

@ -1,61 +1,41 @@
from RSIPI.safety_manager import SafetyManager
import numpy as np
def generate_trajectory(start, end, steps=100, space="cartesian", mode="relative", include_resets=True):
def generate_trajectory(start, end, steps=100, space="cartesian"):
"""
Generates a trajectory from start to end in N steps.
Args:
start (dict): starting pose/position
end (dict): ending pose/position
steps (int): number of points in the trajectory
space (str): "cartesian" or "joint" (for logging/use clarity)
mode (str): "relative" or "absolute"
include_resets (bool): if True, adds reset-to-zero between pulses (only for relative mode)
Returns:
list[dict]: trajectory points
Generates a linear trajectory from start to end over `steps`.
Supported spaces: 'cartesian', 'joint'
"""
if mode not in ["relative", "absolute"]:
raise ValueError("mode must be 'relative' or 'absolute'")
if space not in ["cartesian", "joint"]:
raise ValueError("space must be 'cartesian' or 'joint'")
if not isinstance(start, dict) or not isinstance(end, dict):
raise ValueError("Start and end must be dictionaries.")
axes = start.keys()
trajectory = []
keys = sorted(start.keys())
if keys != sorted(end.keys()):
raise ValueError("Start and end must have matching keys.")
safety_fn = SafetyManager.check_cartesian_limits if space == "cartesian" else SafetyManager.check_joint_limits
for i in range(1, steps + 1):
traj = []
for i in range(steps):
point = {}
for axis in axes:
delta = end[axis] - start[axis]
value = start[axis] + (delta * i / steps)
for key in keys:
point[key] = float(np.linspace(start[key], end[key], steps)[i])
traj.append(point)
if mode == "relative":
point[axis] = delta / steps
else:
point[axis] = value
return traj
if enforce_safety and not safety_fn(point):
raise ValueError(f"⚠️ Safety check failed at step {i}: {point}")
trajectory.append(point)
if mode == "relative" and include_resets:
trajectory.append({axis: 0.0 for axis in axes})
return trajectory
def execute_trajectory(api, trajectory, space="cartesian", rate=0.004):
def execute_trajectory(api, trajectory, space="cartesian", rate=0.012):
"""
Sends a trajectory by updating send variables at a fixed rate.
Use only with absolute motion or slow relative steps.
Sends the trajectory to the robot using the RSIPI API.
- space: 'cartesian' or 'joint'
- rate: time between points in seconds (e.g., 0.012 for 12 ms)
"""
import time
for point in trajectory:
if space == "cartesian":
api.update_cartesian(**point)
elif space == "joint":
api.update_joints(**point)
else:
raise ValueError("space must be 'cartesian' or 'joint'")
raise ValueError("Space must be 'cartesian' or 'joint'")
time.sleep(rate)

View File

@ -3,11 +3,13 @@ import xml.etree.ElementTree as ET
class XMLGenerator:
"""Converts send and receive variables into properly formatted XML messages."""
@staticmethod
@staticmethod
def generate_send_xml(send_variables, network_settings):
"""Generate the send XML message dynamically based on send variables."""
root = ET.Element("Sen", Type=network_settings["sentype"])
print("XML Send Variables : ", send_variables)
print("XML 1")
print("XML : ", send_variables)
for key, value in send_variables.items():
if key == "FREE":
continue
@ -19,6 +21,9 @@ class XMLGenerator:
else:
ET.SubElement(root, key).text = str(value)
if key == "EStr":
print(f"[DEBUG] XMLGenerator sees EStr: {value}")
print("XML 2")
return ET.tostring(root, encoding="utf-8").decode()
@staticmethod
@ -27,7 +32,7 @@ class XMLGenerator:
root = ET.Element("Rob", Type="KUKA") # ✅ Root with Type="KUKA"
for key, value in receive_variables.items():
if isinstance(value, dict) or hasattr(value, "items"): # ✅ Handle dictionaries as elements with attributes
if isinstance(value, dict): # ✅ Handle dictionaries as elements with attributes
element = ET.SubElement(root, key)
for sub_key, sub_value in value.items():
element.set(sub_key, f"{float(sub_value):.2f}")