updated api and cli commands

This commit is contained in:
Adam 2025-03-15 22:25:23 +00:00
parent 622cd7801d
commit 87f0a7cc8c
5 changed files with 254 additions and 109 deletions

View File

@ -1,14 +1,19 @@
import sys import sys
import time import os
from src.RSIPI.rsi_api import RSIAPI import threading
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")))
from src.RSIPI.rsi_client import RSIClient
class RSICommandLineInterface: class RSICommandLineInterface:
"""Command-Line Interface for controlling RSI Client.""" """Command-Line Interface for controlling RSI Client."""
def __init__(self, config_file): def __init__(self, config_file):
"""Initialize CLI with an RSI API instance.""" """Initialize CLI with an RSI API instance."""
self.api = RSIAPI(config_file) self.client = RSIClient(config_file)
self.running = True self.running = True
self.rsi_thread = None # Store RSI thread
def run(self): def run(self):
"""Starts the CLI interaction loop.""" """Starts the CLI interaction loop."""
@ -26,25 +31,145 @@ class RSICommandLineInterface:
cmd = parts[0] cmd = parts[0]
if cmd == "start": if cmd == "start":
self.api.start() self.start_rsi()
elif cmd == "stop": elif cmd == "stop":
self.api.stop() self.stop_rsi()
elif cmd == "set" and len(parts) >= 3:
variable, value = parts[1], " ".join(parts[2:])
self.update_variable(variable, value)
elif cmd == "show":
self.show_variables()
elif cmd == "ipoc":
self.show_ipoc()
elif cmd == "watch":
self.watch_network()
elif cmd == "reset":
self.reset_variables()
elif cmd == "status":
self.show_status()
elif cmd == "reconnect":
self.reconnect()
elif cmd == "toggle" and len(parts) == 3:
self.toggle_digital_io(parts[1], parts[2])
elif cmd == "move_external" and len(parts) == 3:
self.move_external_axis(parts[1], parts[2])
elif cmd == "correct" and len(parts) == 4:
self.correct_position(parts[1], parts[2], parts[3])
elif cmd == "speed" and len(parts) == 3:
self.adjust_speed(parts[1], parts[2])
elif cmd == "override" and len(parts) == 2:
self.override_safety(parts[1])
elif cmd == "exit": elif cmd == "exit":
self.api.stop() self.stop_rsi()
self.running = False self.running = False
elif cmd == "help": elif cmd == "help":
self.show_help() self.show_help()
else: else:
print("❌ Unknown command. Type 'help' for a list of commands.") print("❌ Unknown command. Type 'help' for a list of commands.")
def start_rsi(self):
"""Start RSI in a separate thread to prevent CLI from freezing."""
if self.rsi_thread and self.rsi_thread.is_alive():
print("⚠️ RSI is already running.")
return
self.rsi_thread = threading.Thread(target=self.client.start, daemon=True)
self.rsi_thread.start()
print("✅ RSI started in background.")
def stop_rsi(self):
"""Stop RSI and ensure the thread is properly stopped."""
if not self.rsi_thread or not self.rsi_thread.is_alive():
print("⚠️ RSI is not running.")
return
self.client.stop()
self.rsi_thread.join(timeout=2)
print("✅ RSI stopped.")
def update_variable(self, variable, value):
"""Dynamically update a send variable."""
try:
if value.replace('.', '', 1).isdigit():
value = float(value) if '.' in value else int(value)
self.client.update_send_variable(variable, value)
print(f"✅ Updated {variable} to {value}")
except Exception as e:
print(f"❌ Failed to update {variable}: {e}")
def show_variables(self):
"""Display current send and receive variables."""
print("\n📊 Current RSI Variables:")
print("SEND VARIABLES:")
for key, value in self.client.send_variables.items():
print(f" {key}: {value}")
print("\nRECEIVE VARIABLES:")
for key, value in self.client.receive_variables.items():
print(f" {key}: {value}")
print()
def show_ipoc(self):
"""Display the latest IPOC value."""
print(f"🔄 Latest IPOC: {self.client.receive_variables.get('IPOC', 'N/A')}")
def watch_network(self):
"""Continuously display incoming RSI messages."""
print("📡 Watching network traffic (Press CTRL+C to stop)...")
try:
while True:
print(self.client.receive_variables)
except KeyboardInterrupt:
print("🛑 Stopped watching network.")
def reset_variables(self):
"""Reset send variables to default values."""
self.client.reset_send_variables()
print("✅ Send variables reset to default values.")
def show_status(self):
"""Display full RSI system status."""
print("📄 RSI Status:")
self.show_variables()
print(f"🔌 Network: {self.client.config_parser.get_network_settings()}")
def reconnect(self):
"""Restart network connection without stopping RSI."""
self.client.reconnect()
print("✅ Network connection restarted.")
def toggle_digital_io(self, io, value):
"""Toggle digital I/O states."""
self.client.update_send_variable(io, int(value))
print(f"{io} set to {value}")
def move_external_axis(self, axis, value):
"""Move an external axis."""
self.client.update_send_variable(f"ELPos.{axis}", float(value))
print(f"✅ Moved {axis} to {value}")
def correct_position(self, correction_type, axis, value):
"""Apply correction to RKorr or AKorr."""
self.client.update_send_variable(f"{correction_type}.{axis}", float(value))
print(f"✅ Applied correction: {correction_type}.{axis} = {value}")
def adjust_speed(self, tech_param, value):
"""Adjust speed settings."""
self.client.update_send_variable(tech_param, float(value))
print(f"✅ Set {tech_param} to {value}")
def override_safety(self, limit):
"""Override safety limits."""
print(f"⚠️ Overriding safety limit: {limit}")
def show_help(self): def show_help(self):
"""Displays the list of available commands.""" """Displays the list of available commands."""
print(""" print("""
Available Commands: Available Commands:
start - Start the RSI client in the background start, stop, exit
stop - Stop the RSI client set <var> <value>, show, ipoc, watch, reset, status, reconnect
exit - Stop RSI and exit CLI toggle <DiO/DiL> <0/1>, move_external <axis> <value>
help - Show this command list correct <RKorr/AKorr> <X/Y/Z/A/B/C> <value>, speed <Tech.TX> <value>
override <limit>
""") """)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -59,44 +59,47 @@ class ConfigParser:
# ✅ Extract network settings # ✅ Extract network settings
config = root.find("CONFIG") config = root.find("CONFIG")
if config is None:
raise ValueError("❌ Missing <CONFIG> section in RSI_EthernetConfig.xml")
self.network_settings = { self.network_settings = {
"ip": config.find("IP_NUMBER").text.strip(), "ip": config.find("IP_NUMBER").text.strip() if config.find("IP_NUMBER") is not None else None,
"port": int(config.find("PORT").text.strip()), "port": int(config.find("PORT").text.strip()) if config.find("PORT") is not None else None,
"sentype": config.find("SENTYPE").text.strip(), "sentype": config.find("SENTYPE").text.strip() if config.find("SENTYPE") is not None else None,
"onlysend": config.find("ONLYSEND").text.strip().upper() == "TRUE", "onlysend": config.find("ONLYSEND").text.strip().upper() == "TRUE" if config.find(
"ONLYSEND") is not None else False,
} }
# ✅ Parse SEND section (values received from RSI) # ✅ Debugging output to check loaded values
print(f"✅ Loaded network settings: {self.network_settings}")
if None in self.network_settings.values():
raise ValueError("❌ Missing one or more required network settings (ip, port, sentype, onlysend)")
# ✅ Parse SEND section
send_section = root.find("SEND/ELEMENTS") send_section = root.find("SEND/ELEMENTS")
for element in send_section.findall("ELEMENT"): if send_section is not None:
tag = element.get("TAG").replace("DEF_", "") for element in send_section.findall("ELEMENT"):
indx = element.get("INDX", "") tag = element.get("TAG").replace("DEF_", "")
var_type = element.get("TYPE", "") var_type = element.get("TYPE", "")
print(f"🔍 Processing SEND: {tag} | INDX: {indx} | TYPE: {var_type}") self.process_variable_structure(send_vars, tag, var_type)
if tag != "FREE":
self.process_variable_structure(send_vars, tag, var_type, indx)
# ✅ Parse RECEIVE section (values sent to RSI) # ✅ Parse RECEIVE section
receive_section = root.find("RECEIVE/ELEMENTS") receive_section = root.find("RECEIVE/ELEMENTS")
for element in receive_section.findall("ELEMENT"): if receive_section is not None:
tag = element.get("TAG").replace("DEF_", "") for element in receive_section.findall("ELEMENT"):
indx = element.get("INDX", "") tag = element.get("TAG").replace("DEF_", "")
var_type = element.get("TYPE", "") var_type = element.get("TYPE", "")
print(f"🔍 Processing RECEIVE: {tag} | INDX: {indx} | TYPE: {var_type}") self.process_variable_structure(receive_vars, tag, var_type)
if tag != "FREE":
self.process_variable_structure(receive_vars, tag, var_type, indx) print(f"✅ Final Send Variables: {send_vars}")
print(f"✅ Final Receive Variables: {receive_vars}")
return send_vars, receive_vars
logging.info("✅ Config processed successfully.")
except Exception as e: except Exception as e:
logging.error(f"❌ Error processing config file: {e}") print(f"❌ Error processing config file: {e}")
return {}, {}
# ✅ Ensure IPOC is always in send variables
if "IPOC" not in send_vars:
send_vars["IPOC"] = 0
print(f"✅ Processed Send Variables: {send_vars}")
print(f"✅ Processed Receive Variables: {receive_vars}")
return send_vars, receive_vars
def process_variable_structure(self, var_dict, tag, var_type, indx=""): def process_variable_structure(self, var_dict, tag, var_type, indx=""):
"""Handles structured and simple variables based on internal structure and TYPE attribute.""" """Handles structured and simple variables based on internal structure and TYPE attribute."""

View File

@ -1,5 +1,5 @@
import time import time
import logging import logger
import pandas as pd import pandas as pd
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import matplotlib.animation as animation import matplotlib.animation as animation

View File

@ -6,6 +6,24 @@ if __name__ == "__main__":
config_file = "RSI_EthernetConfig.xml" # Ensure this file exists in the working directory config_file = "RSI_EthernetConfig.xml" # Ensure this file exists in the working directory
client = RSIClient(config_file) client = RSIClient(config_file)
client.start() client.start()
# print("done")
#
# # client.stop()
sleep(5) sleep(5)
# client.update_variable("EStr", "Testing 123 Testing") print("rdfsfsdfsfsdfjsjfhakjshfd")
print("done") client.update_send_variable("EStr", "Testing 123 Testing")
sleep(20)
client.stop()
# from rsi_api import RSIAPI
# from time import sleep
#
# api = RSIAPI()
# api.start_rsi()
# sleep(5)
# # Dynamically update a variable
# api.update_variable("EStr", "Tessting 123")
#
# while True:
# pass
# # api.stop_rsi()

View File

@ -1,80 +1,79 @@
import threading
import time
import inspect
class RSIAPI: class RSIAPI:
def __init__(self): """RSI API for programmatic control."""
"""Initialize RSIAPI with shared send variables."""
self.shared_send_variables = {} # Ensure this is always initialized def __init__(self, client):
self.running = False # RSI status """Initialize RSIAPI with an RSI client instance."""
print(f"✅ RSIAPI instance created from: {inspect.stack()[1].filename}") self.client = client
print(f"✅ shared_send_variables initialized: {self.shared_send_variables}")
def start_rsi(self): def start_rsi(self):
"""Simulate RSI client startup.""" """Start the RSI client."""
print("\n🚀 Starting RSI Client...") self.client.start()
self.running = True return "✅ RSI started."
self.shared_send_variables["EStr"] = "RSI Started" # Default value
print(f"✅ RSI Running: {self.running}")
print(f"📌 Initial shared_send_variables: {self.shared_send_variables}")
# Run a separate thread to simulate RSI process
rsi_thread = threading.Thread(target=self.rsi_loop, daemon=True)
rsi_thread.start()
def rsi_loop(self):
"""Simulate RSI running in the background."""
while self.running:
print(f"🔄 RSI Loop Running... Current EStr: {self.shared_send_variables.get('EStr', 'N/A')}")
time.sleep(2) # Simulate 2-second update intervals
def update_variable(self, variable, value):
"""Update a variable in shared_send_variables."""
print("\n🔍 Debugging update_variable()")
print(f"🔹 Checking if shared_send_variables exists: {hasattr(self, 'shared_send_variables')}")
if not hasattr(self, "shared_send_variables"):
print("❌ Error: shared_send_variables is missing!")
return
if variable in self.shared_send_variables:
self.shared_send_variables[variable] = value
print(f"✅ Updated {variable} to {value}")
else:
print(f"⚠️ Warning: Variable '{variable}' not found in shared_send_variables.")
print(f"📌 Available variables: {list(self.shared_send_variables.keys())}")
def stop_rsi(self): def stop_rsi(self):
"""Stop the RSI process.""" """Stop the RSI client."""
print("\n🛑 Stopping RSI Client...") self.client.stop()
self.running = False return "✅ RSI stopped."
self.shared_send_variables["EStr"] = "RSI Stopped"
print(f"✅ RSI Stopped. Final EStr: {self.shared_send_variables['EStr']}")
def update_variable(self, variable, value):
"""Dynamically update an RSI variable."""
try:
if isinstance(value, str) and value.replace('.', '', 1).isdigit():
value = float(value) if '.' in value else int(value)
self.client.update_send_variable(variable, value)
return f"✅ Updated {variable} to {value}"
except Exception as e:
return f"❌ Failed to update {variable}: {e}"
# ============================== def get_variables(self):
# ✅ TEST CODE: Start RSI & Change Variables """Retrieve current send and receive variables."""
# ============================== return {
"send_variables": dict(self.client.send_variables),
"receive_variables": dict(self.client.receive_variables)
}
if __name__ == "__main__": def get_ipoc(self):
print("\n🚀 Starting RSIAPI Test...\n") """Retrieve the latest IPOC value."""
return self.client.receive_variables.get("IPOC", "N/A")
# Step 1: Create an instance of RSIAPI def reconnect(self):
api = RSIAPI() """Restart the network connection without stopping RSI."""
self.client.reconnect()
return "✅ Network connection restarted."
# Step 2: Start RSI def toggle_digital_io(self, io, value):
api.start_rsi() """Toggle digital I/O states."""
self.client.update_send_variable(io, int(value))
return f"{io} set to {value}"
# Step 3: Wait 3 seconds and update `EStr` def move_external_axis(self, axis, value):
time.sleep(3) """Move an external axis."""
print("\n🛠 Updating 'EStr' variable to 'Testing 123'...") self.client.update_send_variable(f"ELPos.{axis}", float(value))
api.update_variable("EStr", "Testing 123") return f"✅ Moved {axis} to {value}"
# Step 4: Wait another 3 seconds and update `EStr` again def correct_position(self, correction_type, axis, value):
time.sleep(3) """Apply correction to RKorr or AKorr."""
print("\n🛠 Updating 'EStr' variable to 'Final Test Value'...") self.client.update_send_variable(f"{correction_type}.{axis}", float(value))
api.update_variable("EStr", "Final Test Value") return f"✅ Applied correction: {correction_type}.{axis} = {value}"
# Step 5: Stop RSI after 5 more seconds def adjust_speed(self, tech_param, value):
"""Adjust speed settings."""
self.client.update_send_variable(tech_param, float(value))
return f"✅ Set {tech_param} to {value}"
def override_safety(self, limit):
"""Override safety limits."""
return f"⚠️ Overriding safety limit: {limit}"
def reset_variables(self):
"""Reset send variables to default values."""
self.client.reset_send_variables()
return "✅ Send variables reset to default values."
def get_status(self):
"""Retrieve full RSI system status."""
return {
"network": self.client.config_parser.get_network_settings(),
"send_variables": dict(self.client.send_variables),
"receive_variables": dict(self.client.receive_variables)
}