Fixed warnings in rsi_api

This commit is contained in:
Adam 2025-04-05 01:47:33 +01:00
parent a81aef45b5
commit ec26ff3175
4 changed files with 18 additions and 44 deletions

View File

@ -25,7 +25,6 @@ class RSIAPI:
self.graph_thread = None#
self.trajectory_queue = []
import threading # (Put this at the top of the file)
def start_rsi(self):
"""Start the RSI client in a background thread."""
@ -41,7 +40,7 @@ class RSIAPI:
def update_variable(self, variable, value):
"""Dynamically update an RSI variable."""
try:
if isinstance(value, str) and value.replace('.', '', 1).isdigit():
if isinstance(value, str) and str(value).replace('.', '', 1).isdigit():
value = float(value) if '.' in value else int(value)
self.client.update_send_variable(variable, value)
return f"✅ Updated {variable} to {value}"
@ -197,14 +196,6 @@ class RSIAPI:
return f"{alert_type.capitalize()} alert threshold set to {value}"
return "❌ Invalid alert type. Use 'deviation' or 'force'."
# ✅ DATA EXPORT & ANALYSIS
def export_movement_data(self, filename):
"""Export movement data to a CSV file."""
data = self.client.get_movement_data()
df = pd.DataFrame(data)
df.to_csv(filename, index=False)
return f"✅ Data exported to {filename}"
def generate_report(self, filename, format_type):
"""Generate a statistical report from movement data."""
data = self.client.get_movement_data()
@ -215,19 +206,25 @@ class RSIAPI:
"Mean Position Deviation": df.iloc[:, 1:].mean().to_dict(),
}
path = f"{filename}.{format_type.lower()}"
if format_type == "csv":
df.to_csv(f"{filename}.csv", index=False)
df.to_csv(path, index=False)
elif format_type == "json":
with open(f"{filename}.json", "w") as f:
json.dump(report, f)
with open(path, "w") as f:
f.write(json.dumps(report))
elif format_type == "pdf":
fig, ax = plt.subplots()
df.plot(ax=ax)
plt.savefig(f"{filename}.pdf")
plt.savefig(path)
else:
raise ValueError(f"❌ Unsupported report format: {format_type}")
return f"✅ Report saved as {filename}.{format_type}"
return f"✅ Report saved as {path}"
def visualize_csv_log(self, csv_file, export=False):
@staticmethod
def visualise_csv_log(csv_file, export=False):
"""
Visualize CSV log file directly via RSIAPI.
@ -322,12 +319,12 @@ class RSIAPI:
if not data:
raise RuntimeError("No data available to export.")
import pandas as pd
df = pd.DataFrame(data)
df.to_csv(filename, index=False)
return f"✅ Movement data exported to {filename}"
def compare_test_runs(self, file1, file2):
@staticmethod
def compare_test_runs(file1, file2):
"""
Compares two test run CSV files.
Returns a summary of average and max deviation for each axis.
@ -349,30 +346,6 @@ class RSIAPI:
return diffs
def generate_report(self, file, output="report.txt"):
"""
Generates a summary report from a CSV log (e.g., trajectory stats).
"""
import pandas as pd
df = pd.read_csv(file)
position_cols = [col for col in df.columns if "Receive.RIst" in col or "Send.RKorr" in col]
report_lines = []
report_lines.append(f"📄 RSIPI Movement Report")
report_lines.append(f"Source File: {file}")
report_lines.append(f"Total entries: {len(df)}\n")
for col in position_cols:
stats = df[col].describe()
report_lines.append(
f"{col}: mean={stats['mean']:.2f}, std={stats['std']:.2f}, min={stats['min']:.2f}, max={stats['max']:.2f}")
with open(output, "w") as f:
f.write("\n".join(report_lines))
return f"✅ Report generated: {output}"
def update_cartesian(self, **kwargs):
"""
Update Cartesian correction values (RKorr).

View File

@ -91,7 +91,7 @@ class RSICommandLineInterface:
sub = parts[0].lower()
if sub == "show" and len(parts) == 2:
self.client.visualize_csv_log(parts[1])
self.client.visualise_csv_log(parts[1])
elif sub == "compare" and len(parts) == 3:
self.client.compare_test_runs(parts[1], parts[2])

View File

@ -37,6 +37,7 @@ class RSIClient:
)
self.network_process.start()
self.logger = None # Placeholder for logging module
def start(self):
"""Keep the client running and allow periodic debugging."""

View File

@ -97,7 +97,7 @@ class TestRSIPI(unittest.TestCase):
}).to_csv(csv_file, index=False)
try:
self.api.visualize_csv_log(csv_file, export=True)
self.api.visualise_csv_log(csv_file, export=True)
except Exception as e:
self.fail(f"Visualisation test failed: {e}")
finally: