150 lines
5.5 KiB
Python
150 lines
5.5 KiB
Python
import sys
|
|
import os
|
|
import threading
|
|
import multiprocessing
|
|
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")))
|
|
|
|
from src.RSIPI.rsi_client import RSIClient
|
|
from src.RSIPI.graphing import RSIGraphing
|
|
|
|
|
|
class RSICommandLineInterface:
|
|
"""Command-Line Interface for controlling RSI Client."""
|
|
|
|
def __init__(self, config_file):
|
|
"""Initialize CLI with an RSI API instance."""
|
|
self.client = RSIClient(config_file)
|
|
self.running = True
|
|
self.rsi_thread = None # Store RSI thread
|
|
self.graph_process = None # Store graphing process
|
|
|
|
def run(self):
|
|
"""Starts the CLI interaction loop."""
|
|
print("✅ RSI Command-Line Interface Started. Type 'help' for commands.")
|
|
while self.running:
|
|
command = input("RSI> ").strip().lower()
|
|
self.process_command(command)
|
|
|
|
def process_command(self, command):
|
|
"""Processes user input commands."""
|
|
parts = command.split()
|
|
if not parts:
|
|
return
|
|
|
|
cmd = parts[0]
|
|
|
|
if cmd == "start":
|
|
self.start_rsi()
|
|
elif cmd == "stop":
|
|
self.stop_rsi()
|
|
elif cmd == "set" and len(parts) >= 3:
|
|
variable, value = parts[1], " ".join(parts[2:])
|
|
self.update_variable(variable, value)
|
|
elif cmd == "show":
|
|
self.show_variables()
|
|
elif cmd == "ipoc":
|
|
self.show_ipoc()
|
|
elif cmd == "watch":
|
|
self.watch_network()
|
|
elif cmd == "reset":
|
|
self.reset_variables()
|
|
elif cmd == "status":
|
|
self.show_status()
|
|
elif cmd == "reconnect":
|
|
self.reconnect()
|
|
elif cmd == "toggle" and len(parts) == 3:
|
|
self.toggle_digital_io(parts[1], parts[2])
|
|
elif cmd == "move_external" and len(parts) == 3:
|
|
self.move_external_axis(parts[1], parts[2])
|
|
elif cmd == "correct" and len(parts) == 4:
|
|
self.correct_position(parts[1], parts[2], parts[3])
|
|
elif cmd == "speed" and len(parts) == 3:
|
|
self.adjust_speed(parts[1], parts[2])
|
|
elif cmd == "override" and len(parts) == 2:
|
|
self.override_safety(parts[1])
|
|
elif cmd == "log" and len(parts) >= 2:
|
|
self.handle_logging_command(parts)
|
|
elif cmd == "graph" and len(parts) >= 2:
|
|
self.handle_graphing_command(parts)
|
|
elif cmd == "export" and len(parts) == 2:
|
|
self.export_data(parts[1])
|
|
elif cmd == "compare" and len(parts) == 3:
|
|
self.compare_test_runs(parts[1], parts[2])
|
|
elif cmd == "report" and len(parts) >= 3:
|
|
self.generate_report(parts[1], parts[2])
|
|
elif cmd == "exit":
|
|
self.stop_rsi()
|
|
self.running = False
|
|
elif cmd == "help":
|
|
self.show_help()
|
|
else:
|
|
print("❌ Unknown command. Type 'help' for a list of commands.")
|
|
|
|
def export_data(self, filename):
|
|
"""Export movement data to a CSV file."""
|
|
self.client.export_movement_data(filename)
|
|
print(f"✅ Data exported to {filename}")
|
|
|
|
def compare_test_runs(self, file1, file2):
|
|
"""Compare two test runs from CSV files."""
|
|
result = self.client.compare_test_runs(file1, file2)
|
|
print(result)
|
|
|
|
def generate_report(self, filename, format_type):
|
|
"""Generate a statistical report from movement data."""
|
|
if format_type not in ["csv", "json", "pdf"]:
|
|
print("❌ Invalid format. Use 'csv', 'json', or 'pdf'.")
|
|
return
|
|
self.client.generate_report(filename, format_type)
|
|
print(f"✅ Report generated: {filename}.{format_type}")
|
|
|
|
def handle_graphing_command(self, parts):
|
|
"""Handles graphing-related commands."""
|
|
subcmd = parts[1]
|
|
|
|
if subcmd == "start":
|
|
mode = parts[2] if len(parts) > 2 else "position"
|
|
self.start_graphing(mode)
|
|
elif subcmd == "stop":
|
|
self.stop_graphing()
|
|
elif subcmd == "mode" and len(parts) == 3:
|
|
self.change_graph_mode(parts[2])
|
|
elif subcmd == "overlay" and len(parts) == 3:
|
|
if parts[2] == "on":
|
|
self.client.enable_graph_overlay(True)
|
|
print("✅ Overlay enabled.")
|
|
elif parts[2] == "off":
|
|
self.client.enable_graph_overlay(False)
|
|
print("✅ Overlay disabled.")
|
|
else:
|
|
print("❌ Invalid option. Use 'graph overlay on/off'.")
|
|
elif subcmd == "load_plan" and len(parts) == 3:
|
|
self.client.load_planned_trajectory(parts[2])
|
|
print(f"✅ Loaded planned trajectory: {parts[2]}")
|
|
else:
|
|
print("❌ Invalid graph command. Use 'graph start', 'graph stop', 'graph mode <mode>', 'graph overlay on/off', 'graph load_plan <file>'.")
|
|
|
|
def show_help(self):
|
|
"""Displays the list of available commands."""
|
|
print("""
|
|
Available Commands:
|
|
start, stop, exit
|
|
set <var> <value>, show, ipoc, watch, reset, status, reconnect
|
|
toggle <DiO/DiL> <0/1>, move_external <axis> <value>
|
|
correct <RKorr/AKorr> <X/Y/Z/A/B/C> <value>, speed <Tech.TX> <value>
|
|
override <limit>
|
|
log start <file>.csv, log stop, log status
|
|
graph start <mode>, graph stop, graph mode <position|velocity|acceleration|force>
|
|
graph overlay on/off, graph load_plan <file>
|
|
export <filename.csv>
|
|
compare <file1.csv> <file2.csv>
|
|
report <filename> <csv|json|pdf>
|
|
""")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
config_file = "RSI_EthernetConfig.xml"
|
|
cli = RSICommandLineInterface(config_file)
|
|
cli.run()
|