Compare commits

...

2 Commits

Author SHA1 Message Date
2e2792c913 csv logging added 2025-03-15 22:46:42 +00:00
87f0a7cc8c updated api and cli commands 2025-03-15 22:25:23 +00:00
6 changed files with 341 additions and 140 deletions

View File

@ -1,14 +1,19 @@
import sys
import time
from src.RSIPI.rsi_api import RSIAPI
import os
import threading
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")))
from src.RSIPI.rsi_client import RSIClient
class RSICommandLineInterface:
"""Command-Line Interface for controlling RSI Client."""
def __init__(self, config_file):
"""Initialize CLI with an RSI API instance."""
self.api = RSIAPI(config_file)
self.client = RSIClient(config_file)
self.running = True
self.rsi_thread = None # Store RSI thread
def run(self):
"""Starts the CLI interaction loop."""
@ -26,25 +31,164 @@ class RSICommandLineInterface:
cmd = parts[0]
if cmd == "start":
self.api.start()
self.start_rsi()
elif cmd == "stop":
self.api.stop()
self.stop_rsi()
elif cmd == "set" and len(parts) >= 3:
variable, value = parts[1], " ".join(parts[2:])
self.update_variable(variable, value)
elif cmd == "show":
self.show_variables()
elif cmd == "ipoc":
self.show_ipoc()
elif cmd == "watch":
self.watch_network()
elif cmd == "reset":
self.reset_variables()
elif cmd == "status":
self.show_status()
elif cmd == "reconnect":
self.reconnect()
elif cmd == "toggle" and len(parts) == 3:
self.toggle_digital_io(parts[1], parts[2])
elif cmd == "move_external" and len(parts) == 3:
self.move_external_axis(parts[1], parts[2])
elif cmd == "correct" and len(parts) == 4:
self.correct_position(parts[1], parts[2], parts[3])
elif cmd == "speed" and len(parts) == 3:
self.adjust_speed(parts[1], parts[2])
elif cmd == "override" and len(parts) == 2:
self.override_safety(parts[1])
elif cmd == "log" and len(parts) >= 2:
self.handle_logging_command(parts)
elif cmd == "exit":
self.api.stop()
self.stop_rsi()
self.running = False
elif cmd == "help":
self.show_help()
else:
print("❌ Unknown command. Type 'help' for a list of commands.")
def start_rsi(self):
"""Start RSI in a separate thread to prevent CLI from freezing."""
if self.rsi_thread and self.rsi_thread.is_alive():
print("⚠️ RSI is already running.")
return
self.rsi_thread = threading.Thread(target=self.client.start, daemon=True)
self.rsi_thread.start()
print("✅ RSI started in background.")
def stop_rsi(self):
"""Stop RSI and ensure the thread is properly stopped."""
if not self.rsi_thread or not self.rsi_thread.is_alive():
print("⚠️ RSI is not running.")
return
self.client.stop()
self.rsi_thread.join(timeout=2)
print("✅ RSI stopped.")
def update_variable(self, variable, value):
"""Dynamically update a send variable."""
try:
if value.replace('.', '', 1).isdigit():
value = float(value) if '.' in value else int(value)
self.client.update_send_variable(variable, value)
print(f"✅ Updated {variable} to {value}")
except Exception as e:
print(f"❌ Failed to update {variable}: {e}")
def show_variables(self):
"""Display current send and receive variables."""
print("\n📊 Current RSI Variables:")
print("SEND VARIABLES:")
for key, value in self.client.send_variables.items():
print(f" {key}: {value}")
print("\nRECEIVE VARIABLES:")
for key, value in self.client.receive_variables.items():
print(f" {key}: {value}")
print()
def show_ipoc(self):
"""Display the latest IPOC value."""
print(f"🔄 Latest IPOC: {self.client.receive_variables.get('IPOC', 'N/A')}")
def watch_network(self):
"""Continuously display incoming RSI messages."""
print("📡 Watching network traffic (Press CTRL+C to stop)...")
try:
while True:
print(self.client.receive_variables)
except KeyboardInterrupt:
print("🛑 Stopped watching network.")
def reset_variables(self):
"""Reset send variables to default values."""
self.client.reset_send_variables()
print("✅ Send variables reset to default values.")
def show_status(self):
"""Display full RSI system status."""
print("📄 RSI Status:")
self.show_variables()
print(f"🔌 Network: {self.client.config_parser.get_network_settings()}")
def reconnect(self):
"""Restart network connection without stopping RSI."""
self.client.reconnect()
print("✅ Network connection restarted.")
def toggle_digital_io(self, io, value):
"""Toggle digital I/O states."""
self.client.update_send_variable(io, int(value))
print(f"{io} set to {value}")
def move_external_axis(self, axis, value):
"""Move an external axis."""
self.client.update_send_variable(f"ELPos.{axis}", float(value))
print(f"✅ Moved {axis} to {value}")
def correct_position(self, correction_type, axis, value):
"""Apply correction to RKorr or AKorr."""
self.client.update_send_variable(f"{correction_type}.{axis}", float(value))
print(f"✅ Applied correction: {correction_type}.{axis} = {value}")
def adjust_speed(self, tech_param, value):
"""Adjust speed settings."""
self.client.update_send_variable(tech_param, float(value))
print(f"✅ Set {tech_param} to {value}")
def override_safety(self, limit):
"""Override safety limits."""
print(f"⚠️ Overriding safety limit: {limit}")
def handle_logging_command(self, parts):
"""Handles logging-related commands."""
subcmd = parts[1]
if subcmd == "start" and len(parts) == 3:
filename = parts[2]
self.client.start_logging(filename)
print(f"✅ Logging started: {filename}")
elif subcmd == "stop":
self.client.stop_logging()
print("🛑 Logging stopped.")
elif subcmd == "status":
status = "ON" if self.client.is_logging_active() else "OFF"
print(f"📊 Logging Status: {status}")
else:
print("❌ Invalid log command. Use 'log start <file>.csv', 'log stop', or 'log status'.")
def show_help(self):
"""Displays the list of available commands."""
print("""
Available Commands:
start - Start the RSI client in the background
stop - Stop the RSI client
exit - Stop RSI and exit CLI
help - Show this command list
start, stop, exit
set <var> <value>, show, ipoc, watch, reset, status, reconnect
toggle <DiO/DiL> <0/1>, move_external <axis> <value>
correct <RKorr/AKorr> <X/Y/Z/A/B/C> <value>, speed <Tech.TX> <value>
override <limit>
log start <file>.csv, log stop, log status
""")
if __name__ == "__main__":

View File

@ -59,44 +59,47 @@ class ConfigParser:
# ✅ Extract network settings
config = root.find("CONFIG")
if config is None:
raise ValueError("❌ Missing <CONFIG> section in RSI_EthernetConfig.xml")
self.network_settings = {
"ip": config.find("IP_NUMBER").text.strip(),
"port": int(config.find("PORT").text.strip()),
"sentype": config.find("SENTYPE").text.strip(),
"onlysend": config.find("ONLYSEND").text.strip().upper() == "TRUE",
"ip": config.find("IP_NUMBER").text.strip() if config.find("IP_NUMBER") is not None else None,
"port": int(config.find("PORT").text.strip()) if config.find("PORT") is not None else None,
"sentype": config.find("SENTYPE").text.strip() if config.find("SENTYPE") is not None else None,
"onlysend": config.find("ONLYSEND").text.strip().upper() == "TRUE" if config.find(
"ONLYSEND") is not None else False,
}
# ✅ Parse SEND section (values received from RSI)
# ✅ Debugging output to check loaded values
print(f"✅ Loaded network settings: {self.network_settings}")
if None in self.network_settings.values():
raise ValueError("❌ Missing one or more required network settings (ip, port, sentype, onlysend)")
# ✅ Parse SEND section
send_section = root.find("SEND/ELEMENTS")
for element in send_section.findall("ELEMENT"):
tag = element.get("TAG").replace("DEF_", "")
indx = element.get("INDX", "")
var_type = element.get("TYPE", "")
print(f"🔍 Processing SEND: {tag} | INDX: {indx} | TYPE: {var_type}")
if tag != "FREE":
self.process_variable_structure(send_vars, tag, var_type, indx)
if send_section is not None:
for element in send_section.findall("ELEMENT"):
tag = element.get("TAG").replace("DEF_", "")
var_type = element.get("TYPE", "")
self.process_variable_structure(send_vars, tag, var_type)
# ✅ Parse RECEIVE section (values sent to RSI)
# ✅ Parse RECEIVE section
receive_section = root.find("RECEIVE/ELEMENTS")
for element in receive_section.findall("ELEMENT"):
tag = element.get("TAG").replace("DEF_", "")
indx = element.get("INDX", "")
var_type = element.get("TYPE", "")
print(f"🔍 Processing RECEIVE: {tag} | INDX: {indx} | TYPE: {var_type}")
if tag != "FREE":
self.process_variable_structure(receive_vars, tag, var_type, indx)
if receive_section is not None:
for element in receive_section.findall("ELEMENT"):
tag = element.get("TAG").replace("DEF_", "")
var_type = element.get("TYPE", "")
self.process_variable_structure(receive_vars, tag, var_type)
print(f"✅ Final Send Variables: {send_vars}")
print(f"✅ Final Receive Variables: {receive_vars}")
return send_vars, receive_vars
logging.info("✅ Config processed successfully.")
except Exception as e:
logging.error(f"❌ Error processing config file: {e}")
# ✅ Ensure IPOC is always in send variables
if "IPOC" not in send_vars:
send_vars["IPOC"] = 0
print(f"✅ Processed Send Variables: {send_vars}")
print(f"✅ Processed Receive Variables: {receive_vars}")
return send_vars, receive_vars
print(f"❌ Error processing config file: {e}")
return {}, {}
def process_variable_structure(self, var_dict, tag, var_type, indx=""):
"""Handles structured and simple variables based on internal structure and TYPE attribute."""

View File

@ -1,5 +1,5 @@
import time
import logging
import logger
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.animation as animation

View File

@ -6,6 +6,24 @@ if __name__ == "__main__":
config_file = "RSI_EthernetConfig.xml" # Ensure this file exists in the working directory
client = RSIClient(config_file)
client.start()
# print("done")
#
# # client.stop()
sleep(5)
# client.update_variable("EStr", "Testing 123 Testing")
print("done")
print("rdfsfsdfsfsdfjsjfhakjshfd")
client.update_send_variable("EStr", "Testing 123 Testing")
sleep(20)
client.stop()
# from rsi_api import RSIAPI
# from time import sleep
#
# api = RSIAPI()
# api.start_rsi()
# sleep(5)
# # Dynamically update a variable
# api.update_variable("EStr", "Tessting 123")
#
# while True:
# pass
# # api.stop_rsi()

View File

@ -1,25 +1,15 @@
import multiprocessing
import socket
import time
import csv
import logging
from config_parser import ConfigParser
import sys
import os
import socket
import xml.etree.ElementTree as ET
import multiprocessing
from xml_handler import XMLGenerator # ✅ Import XML generator module
from config_parser import ConfigParser # ✅ Import the updated config parser
import multiprocessing
import socket
import logging
import xml.etree.ElementTree as ET # ✅ FIX: Import ElementTree
from config_parser import ConfigParser
from xml_handler import XMLGenerator
class NetworkProcess(multiprocessing.Process):
"""Handles UDP communication in a separate process."""
"""Handles UDP communication and optional CSV logging in a separate process."""
def __init__(self, ip, port, send_variables, receive_variables, stop_event, config_parser):
super().__init__()
@ -29,14 +19,17 @@ class NetworkProcess(multiprocessing.Process):
self.config_parser = config_parser
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# ✅ Fallback IP handling
self.client_address = (ip, port)
if not self.is_valid_ip(ip):
logging.warning(f"Invalid IP address '{ip}' detected. Falling back to '0.0.0.0'.")
print(f"⚠️ Invalid IP '{ip}', falling back to '0.0.0.0'.")
self.client_address = ('0.0.0.0', port)
else:
self.client_address = (ip, port)
self.logging_active = multiprocessing.Value('b', False) # Shared flag for logging
self.log_filename = multiprocessing.Array('c', 256) # Shared memory for filename
self.csv_process = None
try:
self.udp_socket.bind(self.client_address)
@ -63,44 +56,74 @@ class NetworkProcess(multiprocessing.Process):
print("[DEBUG] Network process started.")
while not self.stop_event.is_set():
try:
print("[DEBUG] Waiting for incoming message...")
self.udp_socket.settimeout(5)
data_received, self.controller_ip_and_port = self.udp_socket.recvfrom(1024)
message = data_received.decode()
print(f"[DEBUG] Received message: {message}")
self.process_received_data(message)
# ✅ Generate the send XML using the updated XMLGenerator
send_xml = XMLGenerator.generate_send_xml(self.send_variables, self.config_parser.network_settings)
print(f"[DEBUG] Sending response: {send_xml}")
self.udp_socket.sendto(send_xml.encode(), self.controller_ip_and_port)
# ✅ If logging is active, write data to CSV
if self.logging_active.value:
self.log_to_csv()
except socket.timeout:
print("[WARNING] No message received within timeout period.")
except Exception as e:
print(f"[ERROR] Network process error: {e}")
def stop_network(self):
"""Safely stop the network process."""
if self.udp_socket:
self.udp_socket.close()
print("✅ Network socket closed.")
def process_received_data(self, xml_string):
"""Parse incoming XML and update shared variables."""
try:
root = ET.fromstring(xml_string)
for element in root:
if element.tag in self.receive_variables:
if len(element.attrib) > 0: # ✅ Handle structured data (dictionaries)
if len(element.attrib) > 0:
self.receive_variables[element.tag] = {k: float(v) for k, v in element.attrib.items()}
else:
self.receive_variables[element.tag] = element.text
print(f"[DEBUG] Updated received variables: {self.receive_variables}")
except Exception as e:
print(f"[ERROR] Error parsing received message: {e}")
def log_to_csv(self):
"""Write send/receive variables to the CSV log."""
filename = self.log_filename.value.decode().strip()
if not filename:
return
try:
with open(filename, mode="a", newline="") as file:
writer = csv.writer(file)
# Write header if the file is new
if file.tell() == 0:
headers = ["Timestamp", "IPOC"]
headers += [f"Send.{k}" for k in self.send_variables.keys()]
headers += [f"Receive.{k}" for k in self.receive_variables.keys()]
writer.writerow(headers)
# Write current data
timestamp = time.strftime("%d-%m-%Y %H:%M:%S")
ipoc = self.receive_variables.get("IPOC", 0)
send_data = [self.send_variables.get(k, "") for k in self.send_variables.keys()]
receive_data = [self.receive_variables.get(k, "") for k in self.receive_variables.keys()]
writer.writerow([timestamp, ipoc] + send_data + receive_data)
except Exception as e:
print(f"[ERROR] Failed to log data to CSV: {e}")
def start_logging(self, filename):
"""Start logging RSI data to CSV."""
self.logging_active.value = True
self.log_filename.value = filename.encode()
print(f"✅ CSV Logging started: {filename}")
def stop_logging(self):
"""Stop logging RSI data."""
self.logging_active.value = False
print("🛑 CSV Logging stopped.")
def is_logging_active(self):
"""Return logging status."""
return self.logging_active.value

View File

@ -1,80 +1,93 @@
import threading
import time
import inspect
class RSIAPI:
def __init__(self):
"""Initialize RSIAPI with shared send variables."""
self.shared_send_variables = {} # Ensure this is always initialized
self.running = False # RSI status
print(f"✅ RSIAPI instance created from: {inspect.stack()[1].filename}")
print(f"✅ shared_send_variables initialized: {self.shared_send_variables}")
"""RSI API for programmatic control."""
def __init__(self, client):
"""Initialize RSIAPI with an RSI client instance."""
self.client = client
def start_rsi(self):
"""Simulate RSI client startup."""
print("\n🚀 Starting RSI Client...")
self.running = True
self.shared_send_variables["EStr"] = "RSI Started" # Default value
print(f"✅ RSI Running: {self.running}")
print(f"📌 Initial shared_send_variables: {self.shared_send_variables}")
# Run a separate thread to simulate RSI process
rsi_thread = threading.Thread(target=self.rsi_loop, daemon=True)
rsi_thread.start()
def rsi_loop(self):
"""Simulate RSI running in the background."""
while self.running:
print(f"🔄 RSI Loop Running... Current EStr: {self.shared_send_variables.get('EStr', 'N/A')}")
time.sleep(2) # Simulate 2-second update intervals
def update_variable(self, variable, value):
"""Update a variable in shared_send_variables."""
print("\n🔍 Debugging update_variable()")
print(f"🔹 Checking if shared_send_variables exists: {hasattr(self, 'shared_send_variables')}")
if not hasattr(self, "shared_send_variables"):
print("❌ Error: shared_send_variables is missing!")
return
if variable in self.shared_send_variables:
self.shared_send_variables[variable] = value
print(f"✅ Updated {variable} to {value}")
else:
print(f"⚠️ Warning: Variable '{variable}' not found in shared_send_variables.")
print(f"📌 Available variables: {list(self.shared_send_variables.keys())}")
"""Start the RSI client."""
self.client.start()
return "✅ RSI started."
def stop_rsi(self):
"""Stop the RSI process."""
print("\n🛑 Stopping RSI Client...")
self.running = False
self.shared_send_variables["EStr"] = "RSI Stopped"
print(f"✅ RSI Stopped. Final EStr: {self.shared_send_variables['EStr']}")
"""Stop the RSI client."""
self.client.stop()
return "✅ RSI stopped."
def update_variable(self, variable, value):
"""Dynamically update an RSI variable."""
try:
if isinstance(value, str) and value.replace('.', '', 1).isdigit():
value = float(value) if '.' in value else int(value)
self.client.update_send_variable(variable, value)
return f"✅ Updated {variable} to {value}"
except Exception as e:
return f"❌ Failed to update {variable}: {e}"
# ==============================
# ✅ TEST CODE: Start RSI & Change Variables
# ==============================
def get_variables(self):
"""Retrieve current send and receive variables."""
return {
"send_variables": dict(self.client.send_variables),
"receive_variables": dict(self.client.receive_variables)
}
if __name__ == "__main__":
print("\n🚀 Starting RSIAPI Test...\n")
def get_ipoc(self):
"""Retrieve the latest IPOC value."""
return self.client.receive_variables.get("IPOC", "N/A")
# Step 1: Create an instance of RSIAPI
api = RSIAPI()
def reconnect(self):
"""Restart the network connection without stopping RSI."""
self.client.reconnect()
return "✅ Network connection restarted."
# Step 2: Start RSI
api.start_rsi()
def toggle_digital_io(self, io, value):
"""Toggle digital I/O states."""
self.client.update_send_variable(io, int(value))
return f"{io} set to {value}"
# Step 3: Wait 3 seconds and update `EStr`
time.sleep(3)
print("\n🛠 Updating 'EStr' variable to 'Testing 123'...")
api.update_variable("EStr", "Testing 123")
def move_external_axis(self, axis, value):
"""Move an external axis."""
self.client.update_send_variable(f"ELPos.{axis}", float(value))
return f"✅ Moved {axis} to {value}"
# Step 4: Wait another 3 seconds and update `EStr` again
time.sleep(3)
print("\n🛠 Updating 'EStr' variable to 'Final Test Value'...")
api.update_variable("EStr", "Final Test Value")
def correct_position(self, correction_type, axis, value):
"""Apply correction to RKorr or AKorr."""
self.client.update_send_variable(f"{correction_type}.{axis}", float(value))
return f"✅ Applied correction: {correction_type}.{axis} = {value}"
# Step 5: Stop RSI after 5 more seconds
def adjust_speed(self, tech_param, value):
"""Adjust speed settings."""
self.client.update_send_variable(tech_param, float(value))
return f"✅ Set {tech_param} to {value}"
def override_safety(self, limit):
"""Override safety limits."""
return f"⚠️ Overriding safety limit: {limit}"
def reset_variables(self):
"""Reset send variables to default values."""
self.client.reset_send_variables()
return "✅ Send variables reset to default values."
def get_status(self):
"""Retrieve full RSI system status."""
return {
"network": self.client.config_parser.get_network_settings(),
"send_variables": dict(self.client.send_variables),
"receive_variables": dict(self.client.receive_variables)
}
def start_logging(self, filename):
"""Start logging RSI data to CSV."""
self.client.start_logging(filename)
return f"✅ CSV Logging started: {filename}"
def stop_logging(self):
"""Stop logging RSI data."""
self.client.stop_logging()
return "🛑 CSV Logging stopped."
def is_logging_active(self):
"""Return logging status."""
return self.client.is_logging_active()