All of API has been tested and is working as expeted. Fixed an error where network was always initiliasing without it being needed.

This commit is contained in:
Adam 2025-04-25 21:18:46 +01:00
parent 82b01cc7e8
commit d40ba36c74
3 changed files with 53 additions and 70 deletions

View File

@ -41,7 +41,12 @@ class KukaRSIVisualiser:
fig = plt.figure() fig = plt.figure()
ax = fig.add_subplot(111, projection='3d') ax = fig.add_subplot(111, projection='3d')
ax.plot(self.df["RIst.X"], self.df["RIst.Y"], self.df["RIst.Z"], def safe_col(name):
return name if name in self.df.columns else f"Receive.{name}"
ax.plot(self.df[safe_col("RIst.X")],
self.df[safe_col("RIst.Y")],
self.df[safe_col("RIst.Z")],
label="Actual Trajectory", linestyle='-') label="Actual Trajectory", linestyle='-')
if "RSol.X" in self.df.columns: if "RSol.X" in self.df.columns:

View File

@ -5,58 +5,12 @@ import time
import threading import threading
def generate_figure_8_z(start_xyz, radius=100, loops=1, steps=100):
trajectory = []
for i in range(steps):
t = 2 * math.pi * loops * (i / steps)
x = radius * math.sin(t)
y = radius * math.sin(t) * math.cos(t)
z = radius * math.sin(2 * t) / 2
trajectory.append({
"X": start_xyz["X"] + x,
"Y": start_xyz["Y"] + y,
"Z": start_xyz["Z"] + z
})
return trajectory
def main(): def main():
api = RSIAPI("RSI_EthernetConfig.xml") api = RSIAPI("RSI_EthernetConfig.xml")
print("🛜 Starting RSI client...") print(api.compare_test_runs("25-04-2025_16-33-47.csv", "25-04-2025_20-57-59.csv"))
api.start_rsi()
time.sleep(2)
start_xyz = {"X": 500, "Y": 0, "Z": 500}
trajectory = api.generate_trajectory(
start=start_xyz,
end=generate_figure_8_z(start_xyz, radius=100, loops=1, steps=1)[0], # endpoint unused
steps=100,
space="cartesian",
mode="absolute"
)
# Overwrite with full 8-curve, skipping internal interpolation
trajectory = generate_figure_8_z(start_xyz, radius=100, loops=1, steps=100)
print(f"✅ Generated figure-8 trajectory with {len(trajectory)} steps.")
# 🧠 Start live plotting in a thread (in mode 3d)
def start_plot():
plotter = LivePlotter(api.client, mode="3d", interval=50)
plotter.start()
threading.Thread(target=start_plot, daemon=True).start()
print("🟢 Executing trajectory...")
api.execute_trajectory(trajectory, space="cartesian", rate=0.02)
time.sleep(len(trajectory) * 0.02 + 2.0)
print("✅ Trajectory execution complete.")
print("🛑 Stopping RSI...")
api.stop_rsi()
print("✅ RSI client stopped.")
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -21,7 +21,8 @@ class RSIAPI:
def __init__(self, config_file="RSI_EthernetConfig.xml"): def __init__(self, config_file="RSI_EthernetConfig.xml"):
"""Initialize RSIAPI with an RSI client instance.""" """Initialize RSIAPI with an RSI client instance."""
self.thread = None self.thread = None
self.client = RSIClient(config_file) self.config_file = config_file
self.client = None # Delay instantiation
self.graph_process = None # Store graphing process self.graph_process = None # Store graphing process
self.graphing_instance = None self.graphing_instance = None
self.graph_thread = None# self.graph_thread = None#
@ -29,8 +30,14 @@ class RSIAPI:
self.live_plotter = None self.live_plotter = None
self.live_plot_thread = None self.live_plot_thread = None
def _ensure_client(self):
"""Ensure RSIClient is initialised before use."""
if self.client is None:
from .rsi_client import RSIClient
self.client = RSIClient(self.config_file)
def start_rsi(self): def start_rsi(self):
"""Start the RSI client in a background thread.""" self._ensure_client()
self.thread = threading.Thread(target=self.client.start, daemon=True) self.thread = threading.Thread(target=self.client.start, daemon=True)
self.thread.start() self.thread.start()
return "✅ RSI started in background." return "✅ RSI started in background."
@ -275,34 +282,51 @@ class RSIAPI:
return f"{alert_type.capitalize()} alert threshold set to {value}" return f"{alert_type.capitalize()} alert threshold set to {value}"
return "❌ Invalid alert type. Use 'deviation' or 'force'." return "❌ Invalid alert type. Use 'deviation' or 'force'."
## TODO Test from here.
def generate_report(self, filename, format_type): def generate_report(self, filename, format_type):
"""Generate a statistical report from movement data.""" """
data = self.client.get_movement_data() Generate a statistical report from a CSV log file.
df = pd.DataFrame(data)
report = { Args:
"Max Position Deviation": df.iloc[:, 1:].max().to_dict(), filename (str): Path to the CSV file (or base name without .csv).
"Mean Position Deviation": df.iloc[:, 1:].mean().to_dict(), format_type (str): 'csv', 'json', or 'pdf'
"""
# Ensure filename ends with .csv
if not filename.endswith(".csv"):
filename += ".csv"
if not os.path.exists(filename):
raise FileNotFoundError(f"❌ File not found: {filename}")
df = pd.read_csv(filename)
# Only keep relevant columns (e.g. actual positions)
position_cols = [col for col in df.columns if col.startswith("Receive.RIst.")]
if not position_cols:
raise ValueError("❌ No 'Receive.RIst' position columns found in CSV.")
report_data = {
"Max Position": df[position_cols].max().to_dict(),
"Mean Position": df[position_cols].mean().to_dict(),
} }
path = f"{filename}.{format_type.lower()}" report_base = filename.replace(".csv", "")
output_path = f"{report_base}_report.{format_type.lower()}"
if format_type == "csv": if format_type == "csv":
df.to_csv(path, index=False) pd.DataFrame(report_data).T.to_csv(output_path)
elif format_type == "json": elif format_type == "json":
with open(path, "w") as f: with open(output_path, "w") as f:
f.write(json.dumps(report)) json.dump(report_data, f, indent=4)
elif format_type == "pdf": elif format_type == "pdf":
fig, ax = plt.subplots() fig, ax = plt.subplots()
df.plot(ax=ax) pd.DataFrame(report_data).T.plot(kind='bar', ax=ax)
plt.savefig(path) ax.set_title("RSI Position Report")
plt.tight_layout()
plt.savefig(output_path)
else: else:
raise ValueError(f"❌ Unsupported report format: {format_type}") raise ValueError(f"❌ Unsupported format: {format_type}")
return f"✅ Report saved as {path}"
return f"✅ Report saved as {output_path}"
@staticmethod @staticmethod
def visualise_csv_log(csv_file, export=False): def visualise_csv_log(csv_file, export=False):
@ -321,6 +345,7 @@ class RSIAPI:
if export: if export:
visualizer.export_graphs() visualizer.export_graphs()
## TODO Need to test parsing krl to csv
@staticmethod @staticmethod
def parse_krl_to_csv(src_file, dat_file, output_file): def parse_krl_to_csv(src_file, dat_file, output_file):
""" """
@ -340,6 +365,7 @@ class RSIAPI:
except Exception as e: except Exception as e:
return f"❌ Error parsing KRL files: {e}" return f"❌ Error parsing KRL files: {e}"
## TODO Need to test injecting RSI code.
@staticmethod @staticmethod
def inject_rsi(input_krl, output_krl=None, rsi_config="RSIGatewayv1.rsi"): def inject_rsi(input_krl, output_krl=None, rsi_config="RSIGatewayv1.rsi"):
""" """
@ -357,8 +383,6 @@ class RSIAPI:
except Exception as e: except Exception as e:
return f"❌ RSI injection failed: {e}" return f"❌ RSI injection failed: {e}"
## TODO here
@staticmethod @staticmethod
def generate_trajectory(start, end, steps=100, space="cartesian", mode="absolute", include_resets=False): def generate_trajectory(start, end, steps=100, space="cartesian", mode="absolute", include_resets=False):
"""Generates a linear trajectory (Cartesian or Joint).""" """Generates a linear trajectory (Cartesian or Joint)."""