diff --git a/src/RSIPI/main.py b/src/RSIPI/main.py index 13c094e..1da21f4 100644 --- a/src/RSIPI/main.py +++ b/src/RSIPI/main.py @@ -1,87 +1,66 @@ import time import threading +from winreg import REG_CREATED_NEW_KEY + from RSIPI.rsi_api import RSIAPI - -def generate_pulsed_trajectory(total_mm=1.0, step_mm=0.01): - """ - Creates a pulsed trajectory that moves a total distance by sending small RKorr deltas - followed by resets to zero. - """ - steps = int(total_mm / step_mm) - trajectory = [] - for _ in range(steps): - trajectory.append({"X": step_mm}) # Pulse - trajectory.append({"X": 0.0}) # Reset - return trajectory - - -def feed_pulsed_trajectory(api, trajectory): - """ - Sends each RKorr step synchronised with IPOC changes. - """ - stop_event = threading.Event() - - def feeder(): - last_ipoc = api.get_ipoc() - index = 0 - - while index < len(trajectory) and not stop_event.is_set(): - current_ipoc = api.get_ipoc() - - if current_ipoc != last_ipoc: - last_ipoc = current_ipoc - val = trajectory[index].get("X", 0.0) - api.update_cartesian(X=val) - index += 1 - - time.sleep(0.0005) - - stop_event.set() - - thread = threading.Thread(target=feeder) - thread.start() - return stop_event, thread - - -def reset_rkorr(api): - for axis in ["X", "Y", "Z", "A", "B", "C"]: - api.update_variable(f"RKorr.{axis}", 0.0) - - def main(): + from src.RSIPI.rsi_api import RSIAPI + api = RSIAPI("RSI_EthernetConfig.xml") - print("⚙️ Starting RSI...") - api.start_rsi() - time.sleep(1.0) - # Create trajectories - forward = generate_pulsed_trajectory(total_mm=500.0, step_mm=0.01) - backward = generate_pulsed_trajectory(total_mm=-500.0, step_mm=-0.01) + print(api.set_alert_threshold("deviation", 1.0)) + print(api.set_alert_threshold("force", 5.0)) + print(api.enable_alerts(True)) - loop_count = 5 - print(f"🔁 Starting looped motion (±1 mm), {loop_count} cycles") + api.client.safety_manager.set_limit("RKorr.X", -2.0, 2.0) + print("✅ Limit set for RKorr.X: -2.0 to 2.0") - for i in range(loop_count): - print(f"➡️ Loop {i+1}: Forward") - fwd_done, _ = feed_pulsed_trajectory(api, forward) - while not fwd_done.is_set(): - time.sleep(0.005) + # === Try safe update === + try: + result = api.update_variable("RKorr.X", 1.0) + print(result) + print("✅ Safe value accepted.") + except Exception as e: + print(f"❌ Safe value was rejected unexpectedly: {e}") - reset_rkorr(api) + # === Try unsafe update === + try: + result = api.update_variable("RKorr.X", 5.0) + print(result) + print("❌ Unsafe value was accepted unexpectedly!") + except Exception as e: + print(f"✅ Correctly blocked unsafe value: {e}") - print(f"⬅️ Loop {i+1}: Backward") - bwd_done, _ = feed_pulsed_trajectory(api, backward) - while not bwd_done.is_set(): - time.sleep(0.005) + # === Emergency Stop === + api.client.safety_manager.emergency_stop() + try: + api.update_variable("RKorr.X", 0.0) + print("❌ Update passed during E-STOP (this is wrong)") + except Exception as e: + print(f"✅ Correctly blocked by E-STOP: {e}") - reset_rkorr(api) + # === Reset E-Stop === + api.client.safety_manager.reset_stop() + try: + result = api.update_variable("RKorr.X", 0.0) + print(result) + print("✅ Motion resumed after E-STOP reset.") + except Exception as e: + print(f"❌ Unexpected error after E-STOP reset: {e}") - print("✅ Motion complete. RSI still running.") + # === Enable Override === + api.override_safety(True) + try: + result = api.update_variable("RKorr.X", 999.0) + print(result) + print("✅ Override allowed unsafe motion as expected.") + except Exception as e: + print(f"❌ Override failed: {e}") - print("✅ Motion complete. RSI still running. Waiting for operator or Ctrl+C...") - while True: - time.sleep(1) + # === Check override status === + print("🛡️ Override status:", "ACTIVE" if api.is_safety_overridden() else "INACTIVE") + api.stop_rsi() if __name__ == "__main__": main() diff --git a/src/RSIPI/network_handler.py b/src/RSIPI/network_handler.py index d96a102..3e52697 100644 --- a/src/RSIPI/network_handler.py +++ b/src/RSIPI/network_handler.py @@ -60,12 +60,12 @@ class NetworkProcess(multiprocessing.Process): try: self.udp_socket.settimeout(5) data_received, self.controller_ip_and_port = self.udp_socket.recvfrom(1024) - print("Receive: ", data_received) + #print("Receive: ", data_received) message = data_received.decode() self.process_received_data(message) - print("Network :", self.send_variables) + #print("Network :", self.send_variables) send_xml = XMLGenerator.generate_send_xml(self.send_variables, self.config_parser.network_settings) - print("Send:", send_xml) + #print("Send:", send_xml) self.udp_socket.sendto(send_xml.encode(), self.controller_ip_and_port) # ✅ If logging is active, write data to CSV @@ -105,42 +105,54 @@ class NetworkProcess(multiprocessing.Process): with open(filename, mode="a", newline="") as file: writer = csv.writer(file) - # Write header if the file is new + # Write header if new if file.tell() == 0: headers = ["Timestamp", "IPOC"] - headers += [f"Send.{k}" for k in self.send_variables.keys()] - headers += [f"Receive.{k}" for k in self.receive_variables.keys()] - headers += ["SafetyViolation"] + for k, v in self.send_variables.items(): + if isinstance(v, dict): + headers += [f"Send.{k}.{subk}" for subk in v.keys()] + else: + headers.append(f"Send.{k}") + for k, v in self.receive_variables.items(): + if isinstance(v, dict): + headers += [f"Receive.{k}.{subk}" for subk in v.keys()] + else: + headers.append(f"Receive.{k}") + headers.append("SafetyViolation") writer.writerow(headers) - # Gather values + # Gather values safely timestamp = time.strftime("%d-%m-%Y %H:%M:%S") ipoc = self.receive_variables.get("IPOC", 0) - send_data = [self.send_variables.get(k, "") for k in self.send_variables.keys()] - receive_data = [self.receive_variables.get(k, "") for k in self.receive_variables.keys()] - # 🔴 Check for safety violations - violation = False - for var in self.send_variables: - value = self.send_variables[var] - # Check structured variables - if isinstance(value, dict): - for subkey, subval in value.items(): - path = f"{var}.{subkey}" - try: - self.safety_manager.validate(path, subval) - except Exception as e: - violation = str(e) - break + send_data = [] + for k, v in self.send_variables.items(): + if isinstance(v, dict): + send_data.extend([v.get(subk, "") for subk in v]) else: - try: - self.safety_manager.validate(var, value) - except Exception as e: - violation = str(e) - break + send_data.append(v) - writer.writerow([timestamp, ipoc] + send_data + receive_data + [violation or "False"]) + receive_data = [] + for k, v in self.receive_variables.items(): + if isinstance(v, dict): + receive_data.extend([v.get(subk, "") for subk in v]) + else: + receive_data.append(v) + # Safety check + violation = "False" + try: + for var, val in self.send_variables.items(): + if isinstance(val, dict): + for sub, subval in val.items(): + path = f"{var}.{sub}" + self.safety_manager.validate(path, subval) + else: + self.safety_manager.validate(var, val) + except Exception as e: + violation = str(e) + + writer.writerow([timestamp, ipoc] + send_data + receive_data + [violation]) except Exception as e: print(f"[ERROR] Failed to log data to CSV: {e}") diff --git a/src/RSIPI/rsi_api.py b/src/RSIPI/rsi_api.py index 0db38ed..e37a12b 100644 --- a/src/RSIPI/rsi_api.py +++ b/src/RSIPI/rsi_api.py @@ -3,15 +3,16 @@ import numpy as np import json import matplotlib.pyplot as plt from .rsi_client import RSIClient -from .rsi_graphing import RSIGraphing -from .kuka_visualiser import Kukarsivisualiser +from .kuka_visualiser import KukaRSIVisualiser from .krl_to_csv_parser import KRLParser from .inject_rsi_to_krl import inject_rsi_to_krl import threading -from threading import Thread from .trajectory_planner import generate_trajectory, execute_trajectory import datetime - +from src.RSIPI.static_plotter import StaticPlotter # Make sure this file exists as described +import os +from src.RSIPI.live_plotter import LivePlotter +from threading import Thread class RSIAPI: """RSI API for programmatic control, including alerts, logging, graphing, and data retrieval.""" @@ -24,7 +25,8 @@ class RSIAPI: self.graphing_instance = None self.graph_thread = None# self.trajectory_queue = [] - + self.live_plotter = None + self.live_plot_thread = None def start_rsi(self): """Start the RSI client in a background thread.""" @@ -37,41 +39,46 @@ class RSIAPI: self.client.stop() return "✅ RSI stopped." - def update_variable(self, variable, value): - """Dynamically update an RSI variable.""" - try: - if isinstance(value, str) and str(value).replace('.', '', 1).isdigit(): - value = float(value) if '.' in value else int(value) - self.client.update_send_variable(variable, value) - return f"✅ Updated {variable} to {value}" - except Exception as e: - return f"❌ Failed to update {variable}: {e}" - - def show_variables(self, group: str = "all"): - """ - Prints current values of send/receive variable groups. - """ - import pprint - live_data = self.get_live_data() - - if group == "all": - pprint.pprint(live_data) + def update_variable(self, name, value): + if "." in name: + parent, child = name.split(".", 1) + full_path = f"{parent}.{child}" + if parent in self.client.send_variables: + current = dict(self.client.send_variables[parent]) + # 🛡️ Validate using SafetyManager + safe_value = self.client.safety_manager.validate(full_path, float(value)) + current[child] = safe_value + self.client.send_variables[parent] = current + return f"✅ Updated {name} to {safe_value}" + else: + raise KeyError(f"❌ Parent variable '{parent}' not found in send_variables") else: - found = False - for key in live_data: - if key.lower() == group.lower(): - pprint.pprint({key: live_data[key]}) - found = True - break - if not found: - print(f"⚠️ Group '{group}' not found. Try one of: {', '.join(live_data.keys())}") + safe_value = self.client.safety_manager.validate(name, float(value)) + self.client.send_variables[name] = safe_value + return f"✅ Updated {name} to {safe_value}" - def get_variables(self): - """Retrieve current send and receive variables.""" - return { - "send_variables": dict(self.client.send_variables), - "receive_variables": dict(self.client.receive_variables) - } + def show_variables(self): + """Print available variable names in send and receive variables.""" + def format_grouped(var_dict): + output = [] + for var, val in var_dict.items(): + if isinstance(val, dict): + sub_vars = ', '.join(val.keys()) + output.append(f"{var}: {sub_vars}") + else: + output.append(var) + return output + + send_vars = format_grouped(self.client.send_variables) + receive_vars = format_grouped(self.client.receive_variables) + + print("Send Variables:") + for item in send_vars: + print(f" - {item}") + + print("\nReceive Variables:") + for item in receive_vars: + print(f" - {item}") def get_live_data(self): """Retrieve real-time RSI data for external processing.""" @@ -110,37 +117,45 @@ class RSIAPI: self.client.reconnect() return "✅ Network connection restarted." - def toggle_digital_io(self, io, value): - """Toggle digital I/O states.""" - self.client.update_send_variable(io, int(value)) - return f"✅ {io} set to {value}" + def toggle_digital_io(self, io_group, io_name, state): + """ + Toggle a digital IO variable. + + Args: + io_group (str): Parent variable group (e.g., 'Digout', 'DiO', 'DiL') + io_name (str): IO name or number within the group (e.g., 'o1', '1') + state (bool | int): Desired state (True/False or 1/0) + + Returns: + str: Success or failure message. + """ + var_name = f"{io_group}.{io_name}" + state_value = int(bool(state)) # ensures it's either 1 or 0 + return self.update_variable(var_name, state_value) def move_external_axis(self, axis, value): """Move an external axis.""" - self.client.update_send_variable(f"ELPos.{axis}", float(value)) - return f"✅ Moved {axis} to {value}" + return self.update_variable(f"ELPos.{axis}", value) def correct_position(self, correction_type, axis, value): """Apply correction to RKorr or AKorr.""" - self.client.update_send_variable(f"{correction_type}.{axis}", float(value)) - return f"✅ Applied correction: {correction_type}.{axis} = {value}" + return self.update_variable(f"{correction_type}.{axis}", value) def adjust_speed(self, tech_param, value): - """Adjust speed settings.""" - self.client.update_send_variable(tech_param, float(value)) - return f"✅ Set {tech_param} to {value}" + """Adjust speed settings (e.g., Tech.T21).""" + return self.update_variable(tech_param, value) def reset_variables(self): """Reset send variables to default values.""" self.client.reset_send_variables() return "✅ Send variables reset to default values." - def get_status(self): - """Retrieve full RSI system status.""" + def show_config_file(self): + """Retrieve key information from config file.""" return { - "network": self.client.config_parser.get_network_settings(), - "send_variables": dict(self.client.send_variables), - "receive_variables": dict(self.client.receive_variables) + "Network": self.client.config_parser.get_network_settings(), + "Send variables": dict(self.client.send_variables), + "Receive variables": dict(self.client.receive_variables) } def start_logging(self, filename=None): @@ -154,28 +169,91 @@ class RSIAPI: def stop_logging(self): """Stop logging RSI data.""" self.client.stop_logging() - return "🛑 CSV Logging stopped." + return "CSV Logging stopped." def is_logging_active(self): """Return logging status.""" return self.client.is_logging_active() - def start_graphing(self, mode="position", overlay=False, plan_file=None): - if self.graph_thread and self.graph_thread.is_alive(): - return "⚠️ Graphing is already running." + @staticmethod + def generate_plot(csv_path: str, plot_type: str = "3d", overlay_path: str = None): + """ + Generate a static plot based on RSI CSV data. - def graph_runner(): - self.graphing_instance = RSIGraphing(self.client, mode=mode, overlay=overlay, plan_file=plan_file) + Args: + csv_path (str): Path to the CSV log file. + plot_type (str): Type of plot to generate. Options: + - "3d", "2d_xy", "2d_xz", "2d_yz" + - "position", "velocity", "acceleration" + - "joints", "force", "deviation" + overlay_path (str): Optional CSV file for planned trajectory (used in "deviation" plots). - self.graph_thread = Thread(target=graph_runner, daemon=True) - self.graph_thread.start() - return f"✅ Graphing started in {mode} mode" + Returns: + str: Status message indicating plot success or failure. + """ + if not os.path.exists(csv_path): + return f"❌ CSV file not found: {csv_path}" + + try: + plot_type = plot_type.lower() + + match plot_type: + case "3d": + StaticPlotter.plot_3d_trajectory(csv_path) + case "2d_xy": + StaticPlotter.plot_2d_projection(csv_path, plane="xy") + case "2d_xz": + StaticPlotter.plot_2d_projection(csv_path, plane="xz") + case "2d_yz": + StaticPlotter.plot_2d_projection(csv_path, plane="yz") + case "position": + StaticPlotter.plot_position_vs_time(csv_path) + case "velocity": + StaticPlotter.plot_velocity_vs_time(csv_path) + case "acceleration": + StaticPlotter.plot_acceleration_vs_time(csv_path) + case "joints": + StaticPlotter.plot_joint_angles(csv_path) + case "force": + StaticPlotter.plot_motor_currents(csv_path) + case "deviation": + if overlay_path is None or not os.path.exists(overlay_path): + return "❌ Deviation plot requires a valid overlay CSV file." + StaticPlotter.plot_deviation(csv_path, overlay_path) + case _: + return f"❌ Invalid plot type '{plot_type}'. Use one of: 3d, 2d_xy, 2d_xz, 2d_yz, position, velocity, acceleration, joints, force, deviation." + + return f"✅ Plot '{plot_type}' generated successfully." + except Exception as e: + return f"❌ Failed to generate plot '{plot_type}': {str(e)}" + + + + def start_live_plot(self, mode="3d", interval=100): + if self.live_plotter and self.live_plotter.running: + return "⚠️ Live plotting already active." + + def runner(): + self.live_plotter = LivePlotter(self.client, mode=mode, interval=interval) + self.live_plotter.start() + + self.live_plot_thread = Thread(target=runner, daemon=True) + self.live_plot_thread.start() + return f"📈 Live plot started in '{mode}' mode at {interval}ms interval." + + def stop_live_plot(self): + if self.live_plotter and self.live_plotter.running: + self.live_plotter.stop() + return "🛑 Live plotting stopped." + return "⚠️ No live plot is currently running." + + def change_live_plot_mode(self, mode): + if self.live_plotter and self.live_plotter.running: + self.live_plotter.change_mode(mode) + return f"🔄 Live plot mode changed to '{mode}'." + return "⚠️ No live plot is active to change mode." + - def stop_graphing(self): - if self.graphing_instance: - self.graphing_instance.stop() - return "🛑 Graphing stopped" - return "⚠️ Graphing not running." # ✅ ALERT METHODS def enable_alerts(self, enable): @@ -196,6 +274,8 @@ class RSIAPI: return f"✅ {alert_type.capitalize()} alert threshold set to {value}" return "❌ Invalid alert type. Use 'deviation' or 'force'." + ## TODO Testing to here. + def generate_report(self, filename, format_type): """Generate a statistical report from movement data.""" data = self.client.get_movement_data() @@ -232,7 +312,7 @@ class RSIAPI: csv_file (str): Path to CSV log file. export (bool): Whether to export the plots. """ - visualizer = Kukarsivisualiser(csv_file) + visualizer = KukaRSIVisualiser(csv_file) visualizer.plot_trajectory() visualizer.plot_joint_positions() visualizer.plot_force_trends() diff --git a/src/RSIPI/rsi_cli.py b/src/RSIPI/rsi_cli.py index 0c9f324..ce057b8 100644 --- a/src/RSIPI/rsi_cli.py +++ b/src/RSIPI/rsi_cli.py @@ -1,5 +1,5 @@ from .rsi_api import RSIAPI -from .kuka_visualiser import Kukarsivisualiser +from .kuka_visualiser import KukaRSIVisualiser from .krl_to_csv_parser import KRLParser from .inject_rsi_to_krl import inject_rsi_to_krl @@ -51,7 +51,7 @@ class RSICommandLineInterface: elif cmd == "reset": self.client.reset_variables() elif cmd == "status": - self.client.get_status() + self.client.show_config_file() elif cmd == "reconnect": self.client.reconnect() elif cmd == "toggle" and len(parts) == 3: @@ -122,7 +122,7 @@ class RSICommandLineInterface: rsi_config = parts[3] if len(parts) == 4 else "RSIGatewayv1.rsi" self.inject_rsi(input_krl, output_krl, rsi_config) elif cmd == "show" and len(parts) == 2 and parts[1] == "all": - variables = self.client.get_variables() + variables = self.client.show_variables() print("📤 Send Variables:") for k, v in variables["send_variables"].items(): print(f" {k}: {v}") @@ -207,7 +207,17 @@ class RSICommandLineInterface: print("📏 Enforced Limits:") for var, (lo, hi) in sm.get_limits().items(): print(f" {var}: {lo} → {hi}") + elif cmd == "plot" and len(parts) >= 2: + plot_type = parts[1] + if len(parts) < 3: + print("⚠️ Usage: plot [overlay_path]") + return + csv_path = parts[2] + overlay_path = parts[3] if len(parts) >= 4 else None + + result = self.client.generate_plot(csv_path, plot_type=plot_type, overlay_path=overlay_path) + print(result) elif cmd == "safety-set-limit" and len(parts) == 4: var, lo, hi = parts[1], parts[2], parts[3] try: @@ -262,6 +272,7 @@ class RSICommandLineInterface: self.client.generate_report(filename, format_type) print(f"✅ Report generated: {filename}.{format_type}") + def show_help(self): """Displays the list of available commands.""" print(""" diff --git a/src/RSIPI/safety_manager.py b/src/RSIPI/safety_manager.py index 0284e85..07357b8 100644 --- a/src/RSIPI/safety_manager.py +++ b/src/RSIPI/safety_manager.py @@ -24,22 +24,13 @@ class SafetyManager: self.limits = limits if limits is not None else {} self.e_stop = False self.last_values = {} # Reserved for future tracking or override detection + self.override = False # ➡️ Track if safety checks are overridden def validate(self, path: str, value: float) -> float: - """ - Validates an individual correction value. + if self.override: + # Bypass all safety checks when override is active + return value - Args: - path (str): Key of the variable, e.g., "RKorr.X" - value (float): Proposed value to apply - - Returns: - float: Value if valid - - Raises: - RuntimeError: If emergency stop is active - ValueError: If value exceeds allowed bounds - """ if self.e_stop: logging.warning(f"SafetyManager: {path} update blocked (E-STOP active)") raise RuntimeError(f"SafetyManager: E-STOP active. Motion blocked for {path}.") @@ -71,3 +62,11 @@ class SafetyManager: def is_stopped(self): """Returns whether the emergency stop is active.""" return self.e_stop + + def override_safety(self, enable: bool): + """Enable or disable safety override (bypass all checks).""" + self.override = enable + + def is_safety_overridden(self) -> bool: + """Returns whether safety override is active.""" + return self.override \ No newline at end of file diff --git a/src/RSIPI/static_plotter.py b/src/RSIPI/static_plotter.py new file mode 100644 index 0000000..7c207e9 --- /dev/null +++ b/src/RSIPI/static_plotter.py @@ -0,0 +1,158 @@ +# Re-execute since code state was reset +static_plotter_path = "/mnt/data/static_plotter.py" + +import csv +import matplotlib.pyplot as plt +from mpl_toolkits.mplot3d import Axes3D + +class StaticPlotter: + + @staticmethod + def _load_csv(csv_path): + data = { + "time": [], + "x": [], "y": [], "z": [], + "vx": [], "vy": [], "vz": [], + "ax": [], "ay": [], "az": [], + "joints": {f"A{i}": [] for i in range(1, 7)}, + "force": {f"A{i}": [] for i in range(1, 7)} + } + + with open(csv_path, newline='') as f: + reader = csv.DictReader(f) + for row in reader: + data["time"].append(row.get("Timestamp", "")) + data["x"].append(float(row.get("Receive.RIst.X", 0))) + data["y"].append(float(row.get("Receive.RIst.Y", 0))) + data["z"].append(float(row.get("Receive.RIst.Z", 0))) + for i in range(1, 7): + data["joints"][f"A{i}"].append(float(row.get(f"Receive.AIPos.A{i}", 0))) + data["force"][f"A{i}"].append(float(row.get(f"Receive.MaCur.A{i}", 0))) + return data + + @staticmethod + def plot_3d_trajectory(csv_path): + data = StaticPlotter._load_csv(csv_path) + fig = plt.figure() + ax = fig.add_subplot(111, projection='3d') + ax.plot(data["x"], data["y"], data["z"], label="TCP Path") + ax.set_xlabel("X [mm]") + ax.set_ylabel("Y [mm]") + ax.set_zlabel("Z [mm]") + ax.set_title("3D TCP Trajectory") + ax.legend() + plt.tight_layout() + plt.show() + + @staticmethod + def plot_2d_projection(csv_path, plane="xy"): + data = StaticPlotter._load_csv(csv_path) + x, y = { + "xy": (data["x"], data["y"]), + "xz": (data["x"], data["z"]), + "yz": (data["y"], data["z"]), + }.get(plane, (data["x"], data["y"])) + plt.plot(x, y) + plt.title(f"2D Trajectory Projection ({plane.upper()})") + plt.xlabel(f"{plane[0].upper()} [mm]") + plt.ylabel(f"{plane[1].upper()} [mm]") + plt.grid(True) + plt.tight_layout() + plt.show() + + @staticmethod + def plot_position_vs_time(csv_path): + data = StaticPlotter._load_csv(csv_path) + plt.plot(data["time"], data["x"], label="X") + plt.plot(data["time"], data["y"], label="Y") + plt.plot(data["time"], data["z"], label="Z") + plt.title("TCP Position vs Time") + plt.xlabel("Time") + plt.ylabel("Position [mm]") + plt.legend() + plt.xticks(rotation=45) + plt.tight_layout() + plt.show() + + @staticmethod + def plot_joint_angles(csv_path): + data = StaticPlotter._load_csv(csv_path) + for joint, values in data["joints"].items(): + plt.plot(data["time"], values, label=joint) + plt.title("Joint Angles vs Time") + plt.xlabel("Time") + plt.ylabel("Angle [deg]") + plt.legend() + plt.xticks(rotation=45) + plt.tight_layout() + plt.show() + + @staticmethod + def plot_motor_currents(csv_path): + data = StaticPlotter._load_csv(csv_path) + for joint, values in data["force"].items(): + plt.plot(data["time"], values, label=joint) + plt.title("Motor Current (Torque Proxy) vs Time") + plt.xlabel("Time") + plt.ylabel("Current [Nm]") + plt.legend() + plt.xticks(rotation=45) + plt.tight_layout() + plt.show() + + @staticmethod + def plot_velocity_vs_time(csv_path): + data = StaticPlotter._load_csv(csv_path) + vx = [0] + [(data["x"][i] - data["x"][i - 1]) for i in range(1, len(data["x"]))] + vy = [0] + [(data["y"][i] - data["y"][i - 1]) for i in range(1, len(data["y"]))] + vz = [0] + [(data["z"][i] - data["z"][i - 1]) for i in range(1, len(data["z"]))] + plt.plot(data["time"], vx, label="dX/dt") + plt.plot(data["time"], vy, label="dY/dt") + plt.plot(data["time"], vz, label="dZ/dt") + plt.title("Velocity vs Time") + plt.xlabel("Time") + plt.ylabel("Velocity [mm/s]") + plt.legend() + plt.xticks(rotation=45) + plt.tight_layout() + plt.show() + + @staticmethod + def plot_acceleration_vs_time(csv_path): + data = StaticPlotter._load_csv(csv_path) + vx = [0] + [(data["x"][i] - data["x"][i - 1]) for i in range(1, len(data["x"]))] + vy = [0] + [(data["y"][i] - data["y"][i - 1]) for i in range(1, len(data["y"]))] + vz = [0] + [(data["z"][i] - data["z"][i - 1]) for i in range(1, len(data["z"]))] + ax = [0] + [(vx[i] - vx[i - 1]) for i in range(1, len(vx))] + ay = [0] + [(vy[i] - vy[i - 1]) for i in range(1, len(vy))] + az = [0] + [(vz[i] - vz[i - 1]) for i in range(1, len(vz))] + plt.plot(data["time"], ax, label="d²X/dt²") + plt.plot(data["time"], ay, label="d²Y/dt²") + plt.plot(data["time"], az, label="d²Z/dt²") + plt.title("Acceleration vs Time") + plt.xlabel("Time") + plt.ylabel("Acceleration [mm/s²]") + plt.legend() + plt.xticks(rotation=45) + plt.tight_layout() + plt.show() + + @staticmethod + def plot_deviation(csv_actual, csv_planned): + actual = StaticPlotter._load_csv(csv_actual) + planned = StaticPlotter._load_csv(csv_planned) + deviation = { + "x": [abs(a - b) for a, b in zip(actual["x"], planned["x"])], + "y": [abs(a - b) for a, b in zip(actual["y"], planned["y"])], + "z": [abs(a - b) for a, b in zip(actual["z"], planned["z"])] + } + plt.plot(actual["time"], deviation["x"], label="X Deviation") + plt.plot(actual["time"], deviation["y"], label="Y Deviation") + plt.plot(actual["time"], deviation["z"], label="Z Deviation") + plt.title("Deviation (Actual - Planned) vs Time") + plt.xlabel("Time") + plt.ylabel("Deviation [mm]") + plt.legend() + plt.xticks(rotation=45) + plt.tight_layout() + plt.show() diff --git a/src/RSIPI/test_rsipi.py b/src/RSIPI/test_rsipi.py index e24cbf0..b14a832 100644 --- a/src/RSIPI/test_rsipi.py +++ b/src/RSIPI/test_rsipi.py @@ -73,7 +73,7 @@ class TestRSIPI(unittest.TestCase): self.assertIn("✅ Send variables reset to default values", response) def test_get_status(self): - status = self.api.get_status() + status = self.api.show_config_file() self.assertIn("network", status) self.assertIn("send_variables", status) self.assertIn("receive_variables", status) @@ -148,7 +148,7 @@ class TestRSIPI(unittest.TestCase): def test_get_variables(self): """Test retrieval of full send and receive variable dictionaries.""" - variables = self.api.get_variables() + variables = self.api.show_variables() self.assertIn("send_variables", variables) self.assertIn("receive_variables", variables) self.assertIsInstance(variables["send_variables"], dict)