RSI-PI/src/RSIPI/rsi_api.py

362 lines
13 KiB
Python

import multiprocessing
import pandas as pd
import numpy as np
import json
import matplotlib.pyplot as plt
from .rsi_client import RSIClient
from .rsi_graphing import RSIGraphing
from .kuka_visualizer import KukaRSIVisualizer
from .krl_to_csv_parser import KRLParser
from .inject_rsi_to_krl import inject_rsi_to_krl
import threading # (Put this at the top of the file)
from threading import Thread
from .trajectory_planner import generate_trajectory, execute_trajectory
def compare_test_runs(file1, file2):
"""Compare two movement logs."""
df1 = pd.read_csv(file1)
df2 = pd.read_csv(file2)
diff = abs(df1 - df2)
max_deviation = diff.max()
return f"📊 Max Deviation: {max_deviation}"
class RSIAPI:
"""RSI API for programmatic control, including alerts, logging, graphing, and data retrieval."""
def __init__(self, config_file="RSI_EthernetConfig.xml"):
"""Initialize RSIAPI with an RSI client instance."""
self.client = RSIClient(config_file)
self.graph_process = None # Store graphing process
self.graphing_instance = None
self.graph_thread = None#
self.trajectory_queue = []
import threading # (Put this at the top of the file)
def start_rsi(self):
"""Start the RSI client in a background thread."""
self.thread = threading.Thread(target=self.client.start, daemon=True)
self.thread.start()
return "✅ RSI started in background."
def stop_rsi(self):
"""Stop the RSI client."""
self.client.stop()
return "✅ RSI stopped."
def update_variable(self, variable, value):
"""Dynamically update an RSI variable."""
try:
if isinstance(value, str) and value.replace('.', '', 1).isdigit():
value = float(value) if '.' in value else int(value)
self.client.update_send_variable(variable, value)
return f"✅ Updated {variable} to {value}"
except Exception as e:
return f"❌ Failed to update {variable}: {e}"
def get_variables(self):
"""Retrieve current send and receive variables."""
return {
"send_variables": dict(self.client.send_variables),
"receive_variables": dict(self.client.receive_variables)
}
def get_live_data(self):
"""Retrieve real-time RSI data for external processing."""
return {
"position": self.client.receive_variables.get("RIst", {"X": 0, "Y": 0, "Z": 0}),
"velocity": self.client.receive_variables.get("Velocity", {"X": 0, "Y": 0, "Z": 0}),
"acceleration": self.client.receive_variables.get("Acceleration", {"X": 0, "Y": 0, "Z": 0}),
"force": self.client.receive_variables.get("MaCur", {"A1": 0, "A2": 0, "A3": 0, "A4": 0, "A5": 0, "A6": 0}),
"ipoc": self.client.receive_variables.get("IPOC", "N/A")
}
def get_live_data_as_numpy(self):
data = self.get_live_data()
flat = []
for section in ["position", "velocity", "acceleration", "force"]:
values = list(data[section].values())
flat.append(values)
max_len = max(len(row) for row in flat)
for row in flat:
row.extend([0] * (max_len - len(row))) # Pad missing values
return np.array(flat)
def get_live_data_as_dataframe(self):
"""Retrieve live RSI data as a Pandas DataFrame."""
data = self.get_live_data()
return pd.DataFrame([data])
def get_ipoc(self):
"""Retrieve the latest IPOC value."""
return self.client.receive_variables.get("IPOC", "N/A")
def reconnect(self):
"""Restart the network connection without stopping RSI."""
self.client.reconnect()
return "✅ Network connection restarted."
def toggle_digital_io(self, io, value):
"""Toggle digital I/O states."""
self.client.update_send_variable(io, int(value))
return f"{io} set to {value}"
def move_external_axis(self, axis, value):
"""Move an external axis."""
self.client.update_send_variable(f"ELPos.{axis}", float(value))
return f"✅ Moved {axis} to {value}"
def correct_position(self, correction_type, axis, value):
"""Apply correction to RKorr or AKorr."""
self.client.update_send_variable(f"{correction_type}.{axis}", float(value))
return f"✅ Applied correction: {correction_type}.{axis} = {value}"
def adjust_speed(self, tech_param, value):
"""Adjust speed settings."""
self.client.update_send_variable(tech_param, float(value))
return f"✅ Set {tech_param} to {value}"
def override_safety(self, limit):
"""Override safety limits."""
return f"⚠️ Overriding safety limit: {limit}"
def reset_variables(self):
"""Reset send variables to default values."""
self.client.reset_send_variables()
return "✅ Send variables reset to default values."
def get_status(self):
"""Retrieve full RSI system status."""
return {
"network": self.client.config_parser.get_network_settings(),
"send_variables": dict(self.client.send_variables),
"receive_variables": dict(self.client.receive_variables)
}
# ✅ CSV LOGGING METHODS
def start_logging(self, filename):
"""Start logging RSI data to CSV."""
self.client.start_logging(filename)
return f"✅ CSV Logging started: {filename}"
def stop_logging(self):
"""Stop logging RSI data."""
self.client.stop_logging()
return "🛑 CSV Logging stopped."
def is_logging_active(self):
"""Return logging status."""
return self.client.is_logging_active()
# ✅ GRAPHING METHODS
def start_graphing(self, mode="position", overlay=False, plan_file=None):
if self.graph_thread and self.graph_thread.is_alive():
return "⚠️ Graphing is already running."
def graph_runner():
self.graphing_instance = RSIGraphing(self.client, mode=mode, overlay=overlay, plan_file=plan_file)
self.graph_thread = Thread(target=graph_runner, daemon=True)
self.graph_thread.start()
return f"✅ Graphing started in {mode} mode"
def stop_graphing(self):
if self.graphing_instance:
self.graphing_instance.stop()
return "🛑 Graphing stopped"
return "⚠️ Graphing not running."
# ✅ ALERT METHODS
def enable_alerts(self, enable):
"""Enable or disable real-time alerts."""
self.client.enable_alerts(enable)
return f"✅ Alerts {'enabled' if enable else 'disabled'}."
def set_alert_threshold(self, alert_type, value):
"""Set threshold for deviation or force alerts."""
if alert_type in ["deviation", "force"]:
self.client.set_alert_threshold(alert_type, value)
return f"{alert_type.capitalize()} alert threshold set to {value}"
return "❌ Invalid alert type. Use 'deviation' or 'force'."
# ✅ DATA EXPORT & ANALYSIS
def export_movement_data(self, filename):
"""Export movement data to a CSV file."""
data = self.client.get_movement_data()
df = pd.DataFrame(data)
df.to_csv(filename, index=False)
return f"✅ Data exported to {filename}"
def generate_report(self, filename, format_type):
"""Generate a statistical report from movement data."""
data = self.client.get_movement_data()
df = pd.DataFrame(data)
report = {
"Max Position Deviation": df.iloc[:, 1:].max().to_dict(),
"Mean Position Deviation": df.iloc[:, 1:].mean().to_dict(),
}
if format_type == "csv":
df.to_csv(f"{filename}.csv", index=False)
elif format_type == "json":
with open(f"{filename}.json", "w") as f:
json.dump(report, f)
elif format_type == "pdf":
fig, ax = plt.subplots()
df.plot(ax=ax)
plt.savefig(f"{filename}.pdf")
return f"✅ Report saved as {filename}.{format_type}"
def visualize_csv_log(self, csv_file, export=False):
"""
Visualize CSV log file directly via RSIAPI.
Args:
csv_file (str): Path to CSV log file.
export (bool): Whether to export the plots.
"""
visualizer = KukaRSIVisualizer(csv_file)
visualizer.plot_trajectory()
visualizer.plot_joint_positions()
visualizer.plot_force_trends()
if export:
visualizer.export_graphs()
def parse_krl_to_csv(self, src_file, dat_file, output_file):
"""
Parses KRL files (.src, .dat) and exports coordinates to CSV.
Args:
src_file (str): Path to KRL .src file.
dat_file (str): Path to KRL .dat file.
output_file (str): Path for output CSV file.
"""
try:
parser = KRLParser(src_file, dat_file)
parser.parse_src()
parser.parse_dat()
parser.export_csv(output_file)
return f"✅ KRL data successfully exported to {output_file}"
except Exception as e:
return f"❌ Error parsing KRL files: {e}"
def inject_rsi(self, input_krl, output_krl=None, rsi_config="RSIGatewayv1.rsi"):
"""
Inject RSI commands into a KRL (.src) program file.
Args:
input_krl (str): Path to the input KRL file.
output_krl (str, optional): Path to the output file (defaults to overwriting input).
rsi_config (str, optional): RSI configuration file name.
"""
try:
inject_rsi_to_krl(input_krl, output_krl, rsi_config)
output_path = output_krl if output_krl else input_krl
return f"✅ RSI successfully injected into {output_path}"
except Exception as e:
return f"❌ RSI injection failed: {e}"
def generate_trajectory(self, start, end, steps=100, space="cartesian"):
"""Generates a linear trajectory (Cartesian or Joint)."""
return generate_trajectory(start, end, steps, space)
def execute_trajectory(self, trajectory, space="cartesian", rate=0.012):
"""Executes a trajectory using live updates."""
return execute_trajectory(self, trajectory, space, rate)
def queue_trajectory(self, trajectory, space="cartesian", rate=0.012):
"""Adds a trajectory to the internal queue."""
self.trajectory_queue.append({
"trajectory": trajectory,
"space": space,
"rate": rate,
})
def clear_trajectory_queue(self):
"""Clears all queued trajectories."""
self.trajectory_queue.clear()
def get_trajectory_queue(self):
"""Returns current queued trajectories (metadata only)."""
return [
{"space": item["space"], "steps": len(item["trajectory"]), "rate": item["rate"]}
for item in self.trajectory_queue
]
def execute_queued_trajectories(self):
"""Executes all queued trajectories in order."""
for item in self.trajectory_queue:
self.execute_trajectory(item["trajectory"], item["space"], item["rate"])
self.clear_trajectory_queue()
def export_movement_data(self, filename="movement_log.csv"):
"""
Exports recorded movement data (if available) to a CSV file.
Assumes self.client.logger has stored entries.
"""
if not hasattr(self.client, "logger") or self.client.logger is None:
raise RuntimeError("No logger attached to RSI client.")
data = self.client.get_movement_data()
if not data:
raise RuntimeError("No data available to export.")
import pandas as pd
df = pd.DataFrame(data)
df.to_csv(filename, index=False)
return f"✅ Movement data exported to {filename}"
def compare_test_runs(self, file1, file2):
"""
Compares two test run CSV files.
Returns a summary of average and max deviation for each axis.
"""
import pandas as pd
df1 = pd.read_csv(file1)
df2 = pd.read_csv(file2)
shared_cols = [col for col in df1.columns if col in df2.columns and col.startswith("Receive.RIst")]
diffs = {}
for col in shared_cols:
delta = abs(df1[col] - df2[col])
diffs[col] = {
"mean_diff": delta.mean(),
"max_diff": delta.max(),
}
return diffs
def generate_report(self, file, output="report.txt"):
"""
Generates a summary report from a CSV log (e.g., trajectory stats).
"""
import pandas as pd
df = pd.read_csv(file)
position_cols = [col for col in df.columns if "Receive.RIst" in col or "Send.RKorr" in col]
report_lines = []
report_lines.append(f"📄 RSIPI Movement Report")
report_lines.append(f"Source File: {file}")
report_lines.append(f"Total entries: {len(df)}\n")
for col in position_cols:
stats = df[col].describe()
report_lines.append(
f"{col}: mean={stats['mean']:.2f}, std={stats['std']:.2f}, min={stats['min']:.2f}, max={stats['max']:.2f}")
with open(output, "w") as f:
f.write("\n".join(report_lines))
return f"✅ Report generated: {output}"