from .rsi_api import RSIAPI from .kuka_visualiser import KukaRSIVisualiser from .krl_to_csv_parser import KRLParser from .inject_rsi_to_krl import inject_rsi_to_krl class RSICommandLineInterface: """Command-Line Interface for controlling RSI Client.""" def __init__(self, input_config_file): """Initialize CLI with an RSI API instance.""" self.client = RSIAPI(input_config_file) self.running = True def run(self): """Starts the CLI interaction loop.""" print("✅ RSI Command-Line Interface Started. Type 'help' for commands.") while self.running: command = input("RSI> ").strip().lower() self.process_command(command) def process_command(self, command): """Processes user input commands.""" parts = command.split() if not parts: return cmd = parts[0] if cmd == "start": self.client.start_rsi() elif cmd == "stop": self.client.stop_rsi() elif cmd == "set" and len(parts) >= 3: variable, value = parts[1], " ".join(parts[2:]) self.client.update_variable(variable, value) elif cmd == "alerts" and len(parts) == 2: self.toggle_alerts(parts[1]) elif cmd == "set_alert_threshold" and len(parts) == 3: self.set_alert_threshold(parts[1], parts[2]) elif cmd == "show": if len(parts) == 0: group = "all" else: group = parts[0] self.client.show_variables(group) elif cmd == "ipoc": self.client.get_ipoc() elif cmd == "watch": duration = float(parts[0]) if parts else None self.client.watch_network(duration=duration) elif cmd == "reset": self.client.reset_variables() elif cmd == "status": self.client.show_config_file() elif cmd == "reconnect": self.client.reconnect() elif cmd == "toggle" and len(parts) == 3: self.client.toggle_digital_io(parts[1], parts[2]) elif cmd == "move_external" and len(parts) == 3: self.client.move_external_axis(parts[1], parts[2]) elif cmd == "correct" and len(parts) == 4: self.client.correct_position(parts[1], parts[2], parts[3]) elif cmd == "speed" and len(parts) == 3: self.client.adjust_speed(parts[1], parts[2]) elif cmd == "override" and len(parts) == 2: self.client.override_safety(parts[1]) elif cmd == "log": if len(parts) < 1: print("⚠️ Usage: log start|stop|status") return subcmd = parts[0].lower() if subcmd == "start": filename = self.client.start_logging() print(f"✅ Logging started → {filename}") elif subcmd == "stop": self.client.stop_logging() print("🛑 Logging stopped.") elif subcmd == "status": status = self.client.is_logging_active() print("📊 Logging is currently", "ACTIVE ✅" if status else "INACTIVE ❌") else: print("⚠️ Unknown log subcommand. Use: start, stop, status") elif cmd == "graph": if len(parts) == 0: print("⚠️ Usage: graph show | graph compare ") return sub = parts[0].lower() if sub == "show" and len(parts) == 2: self.client.visualise_csv_log(parts[1]) elif sub == "compare" and len(parts) == 3: self.client.compare_test_runs(parts[1], parts[2]) else: print("⚠️ Usage:\n graph show \n graph compare ") elif cmd == "export" and len(parts) == 2: self.export_data(parts[1]) elif cmd == "compare" and len(parts) == 3: self.compare_test_runs(parts[1], parts[2]) elif cmd == "report" and len(parts) >= 3: self.generate_report(parts[1], parts[2]) elif cmd == "exit": self.client.stop_rsi() self.running = False elif cmd == "help": self.show_help() elif cmd == "visualize" and len(parts) >= 2: csv_file = parts[1] export = ("export" in parts) self.visualize(csv_file, export) elif cmd == "krlparse" and len(parts) == 4: src_file, dat_file, output_file = parts[1], parts[2], parts[3] self.krl_parse(src_file, dat_file, output_file) elif cmd == "inject_rsi" and len(parts) >= 2: input_krl = parts[1] output_krl = parts[2] if len(parts) >= 3 else None rsi_config = parts[3] if len(parts) == 4 else "RSIGatewayv1.rsi" self.inject_rsi(input_krl, output_krl, rsi_config) elif cmd == "show" and len(parts) == 2 and parts[1] == "all": variables = self.client.show_variables() print("📤 Send Variables:") for k, v in variables["send_variables"].items(): print(f" {k}: {v}") print("📥 Receive Variables:") for k, v in variables["receive_variables"].items(): print(f" {k}: {v}") elif cmd == "show" and len(parts) == 2 and parts[1] == "live": data = self.client.get_live_data() print("📡 Live Data:") for k, v in data.items(): print(f" {k}: {v}") elif cmd == "log" and len(parts) == 2 and parts[1] == "status": active = self.client.is_logging_active() print(f"📋 Logging is {'ACTIVE' if active else 'INACTIVE'}") elif cmd == "move_cartesian" and len(parts) >= 3: start_dict = self.parse_pose_string(parts[1]) end_dict = self.parse_pose_string(parts[2]) steps = self.extract_optional_value(parts, "steps", default=50, cast_type=int) rate = self.extract_optional_value(parts, "rate", default=0.04, cast_type=float) trajectory = self.client.generate_trajectory(start_dict, end_dict, steps=steps, space="cartesian") self.client.execute_trajectory(trajectory, space="cartesian", rate=rate) elif cmd == "move_joint" and len(parts) >= 3: start_dict = self.parse_pose_string(parts[1]) end_dict = self.parse_pose_string(parts[2]) steps = self.extract_optional_value(parts, "steps", default=50, cast_type=int) rate = self.extract_optional_value(parts, "rate", default=0.4, cast_type=float) trajectory = self.client.generate_trajectory(start_dict, end_dict, steps=steps, space="joint") self.client.execute_trajectory(trajectory, space="joint", rate=rate) elif cmd == "queue_cartesian" and len(parts) >= 3: start = self.parse_pose_string(parts[1]) end = self.parse_pose_string(parts[2]) steps = self.extract_optional_value(parts, "steps", 50, int) rate = self.extract_optional_value(parts, "rate", 0.04, float) traj = self.client.generate_trajectory(start, end, steps, "cartesian") self.client.queue_trajectory(traj, "cartesian", rate) elif cmd == "queue_joint" and len(parts) >= 3: start = self.parse_pose_string(parts[1]) end = self.parse_pose_string(parts[2]) steps = self.extract_optional_value(parts, "steps", 50, int) rate = self.extract_optional_value(parts, "rate", 0.04, float) traj = self.client.generate_trajectory(start, end, steps, "joint") self.client.queue_trajectory(traj, "joint", rate) elif cmd == "execute_queue": self.client.execute_queued_trajectories() elif cmd == "clear_queue": self.client.clear_trajectory_queue() elif cmd == "show_queue": queue = self.client.get_trajectory_queue() print(f"🧾 Trajectory Queue: {len(queue)} items") for i, q in enumerate(queue): print(f" {i + 1}. {q['space']} | {q['steps']} steps | {q['rate']}s") elif cmd == "export_movement_data" and len(parts) == 2: result = self.client.export_movement_data(parts[1]) print(result) elif cmd == "compare_test_runs" and len(parts) == 3: result = self.client.compare_test_runs(parts[1], parts[2]) print("📊 Comparison Results:") for key, stats in result.items(): print(f"{key}: mean_diff={stats['mean_diff']:.3f}, max_diff={stats['max_diff']:.3f}") elif cmd == "generate_report" and len(parts) in [2, 3]: output = parts[2] if len(parts) == 3 else "report.txt" result = self.client.generate_report(parts[1], output) print(result) elif cmd == "safety-stop": self.client.safety_manager.emergency_stop() print("🛑 Safety: Emergency Stop activated.") elif cmd == "safety-reset": self.client.safety_manager.reset_stop() print("✅ Safety: Emergency Stop reset. Motion allowed.") elif cmd == "safety-status": sm = self.client.safety_manager print("🧱 Safety Status: " + ("STOPPED" if sm.is_stopped() else "ACTIVE")) print("📏 Enforced Limits:") for var, (lo, hi) in sm.get_limits().items(): print(f" {var}: {lo} → {hi}") elif cmd == "plot" and len(parts) >= 2: plot_type = parts[1] if len(parts) < 3: print("⚠️ Usage: plot [overlay_path]") return csv_path = parts[2] overlay_path = parts[3] if len(parts) >= 4 else None result = self.client.generate_plot(csv_path, plot_type=plot_type, overlay_path=overlay_path) print(result) elif cmd == "safety-set-limit" and len(parts) == 4: var, lo, hi = parts[1], parts[2], parts[3] try: lo = float(lo) hi = float(hi) self.client.safety_manager.set_limit(var, lo, hi) print(f"✅ Set limit for {var}: {lo} to {hi}") except ValueError: print("❌ Invalid numbers for limit. Usage: safety-set-limit RKorr.X -5 5") else: print("❌ Unknown command. Type 'help' for a list of commands.") def toggle_alerts(self, state): """Enable or disable real-time alerts.""" if state == "on": self.client.enable_alerts(True) print("✅ Real-time alerts enabled.") elif state == "off": self.client.enable_alerts(False) print("✅ Real-time alerts disabled.") else: print("❌ Invalid option. Use 'alerts on' or 'alerts off'.") def set_alert_threshold(self, alert_type, value): """Set thresholds for deviation or force alerts.""" try: value = float(value) if alert_type in ["deviation", "force"]: self.client.set_alert_threshold(alert_type, value) print(f"✅ {alert_type.capitalize()} alert threshold set to {value}") else: print("❌ Invalid alert type. Use 'deviation' or 'force'.") except ValueError: print("❌ Invalid threshold value. Enter a numeric value.") def export_data(self, filename): """Export movement data to a CSV file.""" self.client.export_movement_data(filename) print(f"✅ Data exported to {filename}") def compare_test_runs(self, file1, file2): """Compare two test runs from CSV files.""" result = self.client.compare_test_runs(file1, file2) print(result) def generate_report(self, filename, format_type): """Generate a statistical report from movement data.""" if format_type not in ["csv", "json", "pdf"]: print("❌ Invalid format. Use 'csv', 'json', or 'pdf'.") return self.client.generate_report(filename, format_type) print(f"✅ Report generated: {filename}.{format_type}") def show_help(self): """Displays the list of available commands.""" print(""" Available Commands: start, stop, exit set , show, ipoc, watch, reset, status, reconnect toggle <0/1>, move_external correct , speed override log start .csv, log stop, log status graph start , graph stop, graph mode graph overlay on/off, graph load_plan export compare report alerts on/off set_alert_threshold show all - Show all current send and receive variables show live - Show real-time TCP, force, and IPOC values log status - Display whether logging is currently active move_cartesian [steps=50] [rate=0.012] e.g., X=0,Y=0,Z=500 A=100,Y=0,Z=500 steps=100 rate=0.012 move_joint [steps=50] [rate=0.012] e.g., A1=0,... A1=90,... steps=60 queue_cartesian [steps=50] [rate=0.012] - Queue linear Cartesian trajectory queue_joint [steps=50] [rate=0.012] - Queue linear Joint trajectory show_queue - Show queued trajectory segments clear_queue - Clear all queued trajectories execute_queue - Execute all queued motions export_movement_data - Export logged motion data to CSV compare_test_runs - Compare 2 test logs (e.g. deviation) generate_report [out.txt] - Create a movement analysis report """) def visualize(self, csv_file, export=False): try: visualizer = Kukarsivisualiser(csv_file) visualizer.plot_trajectory() visualizer.plot_joint_positions() visualizer.plot_force_trends() if export: visualizer.export_graphs() print(f"✅ Visualisations exported for '{csv_file}'") except Exception as e: print(f"❌ Failed to visualize '{csv_file}': {e}") def krl_parse(self, src_file, dat_file, output_file): """CLI method to parse KRL files and output CSV.""" try: parser = KRLParser(src_file, dat_file) parser.parse_src() parser.parse_dat() parser.export_csv(output_file) print(f"✅ KRL files parsed successfully. Output CSV: {output_file}") except Exception as e: print(f"❌ Failed to parse KRL files: {e}") def inject_rsi(self, input_krl, output_krl=None, rsi_config="RSIGatewayv1.rsi"): """Inject RSI commands into a KRL file via CLI.""" try: inject_rsi_to_krl(input_krl, output_krl, rsi_config) output_path = output_krl if output_krl else input_krl print(f"✅ RSI commands successfully injected into '{output_path}'") except Exception as e: print(f"❌ Failed to inject RSI commands: {e}") def extract_optional_value(self, parts, key, default=0, cast_type=float): """ Extracts optional arguments like 'steps=100' or 'rate=0.01' """ for part in parts[3:]: # skip cmd, start, end if part.startswith(f"{key}="): try: return cast_type(part.split("=")[1]) except ValueError: return default return default if __name__ == "__main__": config_file = "RSI_EthernetConfig.xml" cli = RSICommandLineInterface(config_file) cli.run()