RSI-PI/src/RSIPI/cli.py

176 lines
6.7 KiB
Python

from .rsi_client import RSIClient
from .kuka_visualizer import KukaRSIVisualizer
from .krl_parser import KRLParser
class RSICommandLineInterface:
"""Command-Line Interface for controlling RSI Client."""
def __init__(self, config_file):
"""Initialize CLI with an RSI API instance."""
self.client = RSIClient(config_file)
self.running = True
self.rsi_thread = None # Store RSI thread
self.graph_process = None # Store graphing process
def run(self):
"""Starts the CLI interaction loop."""
print("✅ RSI Command-Line Interface Started. Type 'help' for commands.")
while self.running:
command = input("RSI> ").strip().lower()
self.process_command(command)
def process_command(self, command):
"""Processes user input commands."""
parts = command.split()
if not parts:
return
cmd = parts[0]
if cmd == "start":
self.start_rsi()
elif cmd == "stop":
self.stop_rsi()
elif cmd == "set" and len(parts) >= 3:
variable, value = parts[1], " ".join(parts[2:])
self.update_variable(variable, value)
elif cmd == "alerts" and len(parts) == 2:
self.toggle_alerts(parts[1])
elif cmd == "set_alert_threshold" and len(parts) == 3:
self.set_alert_threshold(parts[1], parts[2])
elif cmd == "show":
self.show_variables()
elif cmd == "ipoc":
self.show_ipoc()
elif cmd == "watch":
self.watch_network()
elif cmd == "reset":
self.reset_variables()
elif cmd == "status":
self.show_status()
elif cmd == "reconnect":
self.reconnect()
elif cmd == "toggle" and len(parts) == 3:
self.toggle_digital_io(parts[1], parts[2])
elif cmd == "move_external" and len(parts) == 3:
self.move_external_axis(parts[1], parts[2])
elif cmd == "correct" and len(parts) == 4:
self.correct_position(parts[1], parts[2], parts[3])
elif cmd == "speed" and len(parts) == 3:
self.adjust_speed(parts[1], parts[2])
elif cmd == "override" and len(parts) == 2:
self.override_safety(parts[1])
elif cmd == "log" and len(parts) >= 2:
self.handle_logging_command(parts)
elif cmd == "graph" and len(parts) >= 2:
self.handle_graphing_command(parts)
elif cmd == "export" and len(parts) == 2:
self.export_data(parts[1])
elif cmd == "compare" and len(parts) == 3:
self.compare_test_runs(parts[1], parts[2])
elif cmd == "report" and len(parts) >= 3:
self.generate_report(parts[1], parts[2])
elif cmd == "exit":
self.stop_rsi()
self.running = False
elif cmd == "help":
self.show_help()
elif cmd == "visualize" and len(parts) >= 2:
csv_file = parts[1]
export = ("export" in parts)
self.visualize(csv_file, export)
elif cmd == "krlparse" and len(parts) == 4:
src_file, dat_file, output_file = parts[1], parts[2], parts[3]
self.krl_parse(src_file, dat_file, output_file)
else:
print("❌ Unknown command. Type 'help' for a list of commands.")
def toggle_alerts(self, state):
"""Enable or disable real-time alerts."""
if state == "on":
self.client.enable_alerts(True)
print("✅ Real-time alerts enabled.")
elif state == "off":
self.client.enable_alerts(False)
print("✅ Real-time alerts disabled.")
else:
print("❌ Invalid option. Use 'alerts on' or 'alerts off'.")
def set_alert_threshold(self, alert_type, value):
"""Set thresholds for deviation or force alerts."""
try:
value = float(value)
if alert_type in ["deviation", "force"]:
self.client.set_alert_threshold(alert_type, value)
print(f"{alert_type.capitalize()} alert threshold set to {value}")
else:
print("❌ Invalid alert type. Use 'deviation' or 'force'.")
except ValueError:
print("❌ Invalid threshold value. Enter a numeric value.")
def export_data(self, filename):
"""Export movement data to a CSV file."""
self.client.export_movement_data(filename)
print(f"✅ Data exported to {filename}")
def compare_test_runs(self, file1, file2):
"""Compare two test runs from CSV files."""
result = self.client.compare_test_runs(file1, file2)
print(result)
def generate_report(self, filename, format_type):
"""Generate a statistical report from movement data."""
if format_type not in ["csv", "json", "pdf"]:
print("❌ Invalid format. Use 'csv', 'json', or 'pdf'.")
return
self.client.generate_report(filename, format_type)
print(f"✅ Report generated: {filename}.{format_type}")
def show_help(self):
"""Displays the list of available commands."""
print("""
Available Commands:
start, stop, exit
set <var> <value>, show, ipoc, watch, reset, status, reconnect
toggle <DiO/DiL> <0/1>, move_external <axis> <value>
correct <RKorr/AKorr> <X/Y/Z/A/B/C> <value>, speed <Tech.TX> <value>
override <limit>
log start <file>.csv, log stop, log status
graph start <mode>, graph stop, graph mode <position|velocity|acceleration|force>
graph overlay on/off, graph load_plan <file>
export <filename.csv>
compare <file1.csv> <file2.csv>
report <filename> <csv|json|pdf>
alerts on/off
set_alert_threshold <deviation|force> <value>
""")
def visualize(self, csv_file, export=False):
try:
visualizer = KukaRSIVisualizer(csv_file)
visualizer.plot_trajectory()
visualizer.plot_joint_positions()
visualizer.plot_force_trends()
if export:
visualizer.export_graphs()
print(f"✅ Visualisations exported for '{csv_file}'")
except Exception as e:
print(f"❌ Failed to visualize '{csv_file}': {e}")
def krl_parse(self, src_file, dat_file, output_file):
"""CLI method to parse KRL files and output CSV."""
try:
parser = KRLParser(src_file, dat_file)
parser.parse_src()
parser.parse_dat()
parser.export_csv(output_file)
print(f"✅ KRL files parsed successfully. Output CSV: {output_file}")
except Exception as e:
print(f"❌ Failed to parse KRL files: {e}")
if __name__ == "__main__":
config_file = "RSI_EthernetConfig.xml"
cli = RSICommandLineInterface(config_file)
cli.run()