From 87f0a7cc8c006cf44b1cc6262bbc3c87ba20ed29 Mon Sep 17 00:00:00 2001 From: Adam Date: Sat, 15 Mar 2025 22:25:23 +0000 Subject: [PATCH] updated api and cli commands --- src/RSIPI/cli.py | 145 ++++++++++++++++++++++++++++++++++--- src/RSIPI/config_parser.py | 63 ++++++++-------- src/RSIPI/graphing.py | 2 +- src/RSIPI/main.py | 22 +++++- src/RSIPI/rsi_api.py | 131 +++++++++++++++++---------------- 5 files changed, 254 insertions(+), 109 deletions(-) diff --git a/src/RSIPI/cli.py b/src/RSIPI/cli.py index cd3dd92..5dc7987 100644 --- a/src/RSIPI/cli.py +++ b/src/RSIPI/cli.py @@ -1,14 +1,19 @@ import sys -import time -from src.RSIPI.rsi_api import RSIAPI +import os +import threading + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))) + +from src.RSIPI.rsi_client import RSIClient class RSICommandLineInterface: """Command-Line Interface for controlling RSI Client.""" def __init__(self, config_file): """Initialize CLI with an RSI API instance.""" - self.api = RSIAPI(config_file) + self.client = RSIClient(config_file) self.running = True + self.rsi_thread = None # Store RSI thread def run(self): """Starts the CLI interaction loop.""" @@ -26,25 +31,145 @@ class RSICommandLineInterface: cmd = parts[0] if cmd == "start": - self.api.start() + self.start_rsi() elif cmd == "stop": - self.api.stop() + self.stop_rsi() + elif cmd == "set" and len(parts) >= 3: + variable, value = parts[1], " ".join(parts[2:]) + self.update_variable(variable, value) + elif cmd == "show": + self.show_variables() + elif cmd == "ipoc": + self.show_ipoc() + elif cmd == "watch": + self.watch_network() + elif cmd == "reset": + self.reset_variables() + elif cmd == "status": + self.show_status() + elif cmd == "reconnect": + self.reconnect() + elif cmd == "toggle" and len(parts) == 3: + self.toggle_digital_io(parts[1], parts[2]) + elif cmd == "move_external" and len(parts) == 3: + self.move_external_axis(parts[1], parts[2]) + elif cmd == "correct" and len(parts) == 4: + self.correct_position(parts[1], parts[2], parts[3]) + elif cmd == "speed" and len(parts) == 3: + self.adjust_speed(parts[1], parts[2]) + elif cmd == "override" and len(parts) == 2: + self.override_safety(parts[1]) elif cmd == "exit": - self.api.stop() + self.stop_rsi() self.running = False elif cmd == "help": self.show_help() else: print("❌ Unknown command. Type 'help' for a list of commands.") + def start_rsi(self): + """Start RSI in a separate thread to prevent CLI from freezing.""" + if self.rsi_thread and self.rsi_thread.is_alive(): + print("⚠️ RSI is already running.") + return + + self.rsi_thread = threading.Thread(target=self.client.start, daemon=True) + self.rsi_thread.start() + print("✅ RSI started in background.") + + def stop_rsi(self): + """Stop RSI and ensure the thread is properly stopped.""" + if not self.rsi_thread or not self.rsi_thread.is_alive(): + print("⚠️ RSI is not running.") + return + + self.client.stop() + self.rsi_thread.join(timeout=2) + print("✅ RSI stopped.") + + def update_variable(self, variable, value): + """Dynamically update a send variable.""" + try: + if value.replace('.', '', 1).isdigit(): + value = float(value) if '.' in value else int(value) + self.client.update_send_variable(variable, value) + print(f"✅ Updated {variable} to {value}") + except Exception as e: + print(f"❌ Failed to update {variable}: {e}") + + def show_variables(self): + """Display current send and receive variables.""" + print("\n📊 Current RSI Variables:") + print("SEND VARIABLES:") + for key, value in self.client.send_variables.items(): + print(f" {key}: {value}") + print("\nRECEIVE VARIABLES:") + for key, value in self.client.receive_variables.items(): + print(f" {key}: {value}") + print() + + def show_ipoc(self): + """Display the latest IPOC value.""" + print(f"🔄 Latest IPOC: {self.client.receive_variables.get('IPOC', 'N/A')}") + + def watch_network(self): + """Continuously display incoming RSI messages.""" + print("📡 Watching network traffic (Press CTRL+C to stop)...") + try: + while True: + print(self.client.receive_variables) + except KeyboardInterrupt: + print("🛑 Stopped watching network.") + + def reset_variables(self): + """Reset send variables to default values.""" + self.client.reset_send_variables() + print("✅ Send variables reset to default values.") + + def show_status(self): + """Display full RSI system status.""" + print("📄 RSI Status:") + self.show_variables() + print(f"🔌 Network: {self.client.config_parser.get_network_settings()}") + + def reconnect(self): + """Restart network connection without stopping RSI.""" + self.client.reconnect() + print("✅ Network connection restarted.") + + def toggle_digital_io(self, io, value): + """Toggle digital I/O states.""" + self.client.update_send_variable(io, int(value)) + print(f"✅ {io} set to {value}") + + def move_external_axis(self, axis, value): + """Move an external axis.""" + self.client.update_send_variable(f"ELPos.{axis}", float(value)) + print(f"✅ Moved {axis} to {value}") + + def correct_position(self, correction_type, axis, value): + """Apply correction to RKorr or AKorr.""" + self.client.update_send_variable(f"{correction_type}.{axis}", float(value)) + print(f"✅ Applied correction: {correction_type}.{axis} = {value}") + + def adjust_speed(self, tech_param, value): + """Adjust speed settings.""" + self.client.update_send_variable(tech_param, float(value)) + print(f"✅ Set {tech_param} to {value}") + + def override_safety(self, limit): + """Override safety limits.""" + print(f"⚠️ Overriding safety limit: {limit}") + def show_help(self): """Displays the list of available commands.""" print(""" Available Commands: - start - Start the RSI client in the background - stop - Stop the RSI client - exit - Stop RSI and exit CLI - help - Show this command list + start, stop, exit + set , show, ipoc, watch, reset, status, reconnect + toggle <0/1>, move_external + correct , speed + override """) if __name__ == "__main__": diff --git a/src/RSIPI/config_parser.py b/src/RSIPI/config_parser.py index d47d531..36e388c 100644 --- a/src/RSIPI/config_parser.py +++ b/src/RSIPI/config_parser.py @@ -59,44 +59,47 @@ class ConfigParser: # ✅ Extract network settings config = root.find("CONFIG") + if config is None: + raise ValueError("❌ Missing section in RSI_EthernetConfig.xml") + self.network_settings = { - "ip": config.find("IP_NUMBER").text.strip(), - "port": int(config.find("PORT").text.strip()), - "sentype": config.find("SENTYPE").text.strip(), - "onlysend": config.find("ONLYSEND").text.strip().upper() == "TRUE", + "ip": config.find("IP_NUMBER").text.strip() if config.find("IP_NUMBER") is not None else None, + "port": int(config.find("PORT").text.strip()) if config.find("PORT") is not None else None, + "sentype": config.find("SENTYPE").text.strip() if config.find("SENTYPE") is not None else None, + "onlysend": config.find("ONLYSEND").text.strip().upper() == "TRUE" if config.find( + "ONLYSEND") is not None else False, } - # ✅ Parse SEND section (values received from RSI) + # ✅ Debugging output to check loaded values + print(f"✅ Loaded network settings: {self.network_settings}") + + if None in self.network_settings.values(): + raise ValueError("❌ Missing one or more required network settings (ip, port, sentype, onlysend)") + + # ✅ Parse SEND section send_section = root.find("SEND/ELEMENTS") - for element in send_section.findall("ELEMENT"): - tag = element.get("TAG").replace("DEF_", "") - indx = element.get("INDX", "") - var_type = element.get("TYPE", "") - print(f"🔍 Processing SEND: {tag} | INDX: {indx} | TYPE: {var_type}") - if tag != "FREE": - self.process_variable_structure(send_vars, tag, var_type, indx) + if send_section is not None: + for element in send_section.findall("ELEMENT"): + tag = element.get("TAG").replace("DEF_", "") + var_type = element.get("TYPE", "") + self.process_variable_structure(send_vars, tag, var_type) - # ✅ Parse RECEIVE section (values sent to RSI) + # ✅ Parse RECEIVE section receive_section = root.find("RECEIVE/ELEMENTS") - for element in receive_section.findall("ELEMENT"): - tag = element.get("TAG").replace("DEF_", "") - indx = element.get("INDX", "") - var_type = element.get("TYPE", "") - print(f"🔍 Processing RECEIVE: {tag} | INDX: {indx} | TYPE: {var_type}") - if tag != "FREE": - self.process_variable_structure(receive_vars, tag, var_type, indx) + if receive_section is not None: + for element in receive_section.findall("ELEMENT"): + tag = element.get("TAG").replace("DEF_", "") + var_type = element.get("TYPE", "") + self.process_variable_structure(receive_vars, tag, var_type) + + print(f"✅ Final Send Variables: {send_vars}") + print(f"✅ Final Receive Variables: {receive_vars}") + + return send_vars, receive_vars - logging.info("✅ Config processed successfully.") except Exception as e: - logging.error(f"❌ Error processing config file: {e}") - - # ✅ Ensure IPOC is always in send variables - if "IPOC" not in send_vars: - send_vars["IPOC"] = 0 - - print(f"✅ Processed Send Variables: {send_vars}") - print(f"✅ Processed Receive Variables: {receive_vars}") - return send_vars, receive_vars + print(f"❌ Error processing config file: {e}") + return {}, {} def process_variable_structure(self, var_dict, tag, var_type, indx=""): """Handles structured and simple variables based on internal structure and TYPE attribute.""" diff --git a/src/RSIPI/graphing.py b/src/RSIPI/graphing.py index 68001f9..4b0cc43 100644 --- a/src/RSIPI/graphing.py +++ b/src/RSIPI/graphing.py @@ -1,5 +1,5 @@ import time -import logging +import logger import pandas as pd import matplotlib.pyplot as plt import matplotlib.animation as animation diff --git a/src/RSIPI/main.py b/src/RSIPI/main.py index 0e2a503..113dd2d 100644 --- a/src/RSIPI/main.py +++ b/src/RSIPI/main.py @@ -6,6 +6,24 @@ if __name__ == "__main__": config_file = "RSI_EthernetConfig.xml" # Ensure this file exists in the working directory client = RSIClient(config_file) client.start() + + # print("done") + # + # # client.stop() sleep(5) - # client.update_variable("EStr", "Testing 123 Testing") - print("done") \ No newline at end of file + print("rdfsfsdfsfsdfjsjfhakjshfd") + client.update_send_variable("EStr", "Testing 123 Testing") + sleep(20) + client.stop() +# from rsi_api import RSIAPI +# from time import sleep +# +# api = RSIAPI() +# api.start_rsi() +# sleep(5) +# # Dynamically update a variable +# api.update_variable("EStr", "Tessting 123") +# +# while True: +# pass +# # api.stop_rsi() \ No newline at end of file diff --git a/src/RSIPI/rsi_api.py b/src/RSIPI/rsi_api.py index 410f1fd..1cb5325 100644 --- a/src/RSIPI/rsi_api.py +++ b/src/RSIPI/rsi_api.py @@ -1,80 +1,79 @@ -import threading -import time -import inspect - - class RSIAPI: - def __init__(self): - """Initialize RSIAPI with shared send variables.""" - self.shared_send_variables = {} # Ensure this is always initialized - self.running = False # RSI status - print(f"✅ RSIAPI instance created from: {inspect.stack()[1].filename}") - print(f"✅ shared_send_variables initialized: {self.shared_send_variables}") + """RSI API for programmatic control.""" + + def __init__(self, client): + """Initialize RSIAPI with an RSI client instance.""" + self.client = client def start_rsi(self): - """Simulate RSI client startup.""" - print("\n🚀 Starting RSI Client...") - self.running = True - self.shared_send_variables["EStr"] = "RSI Started" # Default value - print(f"✅ RSI Running: {self.running}") - print(f"📌 Initial shared_send_variables: {self.shared_send_variables}") - - # Run a separate thread to simulate RSI process - rsi_thread = threading.Thread(target=self.rsi_loop, daemon=True) - rsi_thread.start() - - def rsi_loop(self): - """Simulate RSI running in the background.""" - while self.running: - print(f"🔄 RSI Loop Running... Current EStr: {self.shared_send_variables.get('EStr', 'N/A')}") - time.sleep(2) # Simulate 2-second update intervals - - def update_variable(self, variable, value): - """Update a variable in shared_send_variables.""" - print("\n🔍 Debugging update_variable()") - print(f"🔹 Checking if shared_send_variables exists: {hasattr(self, 'shared_send_variables')}") - - if not hasattr(self, "shared_send_variables"): - print("❌ Error: shared_send_variables is missing!") - return - - if variable in self.shared_send_variables: - self.shared_send_variables[variable] = value - print(f"✅ Updated {variable} to {value}") - else: - print(f"⚠️ Warning: Variable '{variable}' not found in shared_send_variables.") - print(f"📌 Available variables: {list(self.shared_send_variables.keys())}") + """Start the RSI client.""" + self.client.start() + return "✅ RSI started." def stop_rsi(self): - """Stop the RSI process.""" - print("\n🛑 Stopping RSI Client...") - self.running = False - self.shared_send_variables["EStr"] = "RSI Stopped" - print(f"✅ RSI Stopped. Final EStr: {self.shared_send_variables['EStr']}") + """Stop the RSI client.""" + self.client.stop() + return "✅ RSI stopped." + def update_variable(self, variable, value): + """Dynamically update an RSI variable.""" + try: + if isinstance(value, str) and value.replace('.', '', 1).isdigit(): + value = float(value) if '.' in value else int(value) + self.client.update_send_variable(variable, value) + return f"✅ Updated {variable} to {value}" + except Exception as e: + return f"❌ Failed to update {variable}: {e}" -# ============================== -# ✅ TEST CODE: Start RSI & Change Variables -# ============================== + def get_variables(self): + """Retrieve current send and receive variables.""" + return { + "send_variables": dict(self.client.send_variables), + "receive_variables": dict(self.client.receive_variables) + } -if __name__ == "__main__": - print("\n🚀 Starting RSIAPI Test...\n") + def get_ipoc(self): + """Retrieve the latest IPOC value.""" + return self.client.receive_variables.get("IPOC", "N/A") - # Step 1: Create an instance of RSIAPI - api = RSIAPI() + def reconnect(self): + """Restart the network connection without stopping RSI.""" + self.client.reconnect() + return "✅ Network connection restarted." - # Step 2: Start RSI - api.start_rsi() + def toggle_digital_io(self, io, value): + """Toggle digital I/O states.""" + self.client.update_send_variable(io, int(value)) + return f"✅ {io} set to {value}" - # Step 3: Wait 3 seconds and update `EStr` - time.sleep(3) - print("\n🛠 Updating 'EStr' variable to 'Testing 123'...") - api.update_variable("EStr", "Testing 123") + def move_external_axis(self, axis, value): + """Move an external axis.""" + self.client.update_send_variable(f"ELPos.{axis}", float(value)) + return f"✅ Moved {axis} to {value}" - # Step 4: Wait another 3 seconds and update `EStr` again - time.sleep(3) - print("\n🛠 Updating 'EStr' variable to 'Final Test Value'...") - api.update_variable("EStr", "Final Test Value") + def correct_position(self, correction_type, axis, value): + """Apply correction to RKorr or AKorr.""" + self.client.update_send_variable(f"{correction_type}.{axis}", float(value)) + return f"✅ Applied correction: {correction_type}.{axis} = {value}" - # Step 5: Stop RSI after 5 more seconds + def adjust_speed(self, tech_param, value): + """Adjust speed settings.""" + self.client.update_send_variable(tech_param, float(value)) + return f"✅ Set {tech_param} to {value}" + def override_safety(self, limit): + """Override safety limits.""" + return f"⚠️ Overriding safety limit: {limit}" + + def reset_variables(self): + """Reset send variables to default values.""" + self.client.reset_send_variables() + return "✅ Send variables reset to default values." + + def get_status(self): + """Retrieve full RSI system status.""" + return { + "network": self.client.config_parser.get_network_settings(), + "send_variables": dict(self.client.send_variables), + "receive_variables": dict(self.client.receive_variables) + } \ No newline at end of file