161 lines
6.7 KiB
Python
161 lines
6.7 KiB
Python
import multiprocessing
|
|
import socket
|
|
import time
|
|
import csv
|
|
import logging
|
|
import xml.etree.ElementTree as ET # ✅ FIX: Import ElementTree
|
|
from .xml_handler import XMLGenerator
|
|
from .safety_manager import SafetyManager
|
|
|
|
class NetworkProcess(multiprocessing.Process):
|
|
"""Handles UDP communication and optional CSV logging in a separate process."""
|
|
|
|
def __init__(self, ip, port, send_variables, receive_variables, stop_event, config_parser):
|
|
super().__init__()
|
|
self.send_variables = send_variables
|
|
self.receive_variables = receive_variables
|
|
self.stop_event = stop_event
|
|
self.config_parser = config_parser
|
|
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
self.safety_manager = SafetyManager(config_parser.safety_limits)
|
|
|
|
self.client_address = (ip, port)
|
|
|
|
if not self.is_valid_ip(ip):
|
|
logging.warning(f"Invalid IP address '{ip}' detected. Falling back to '0.0.0.0'.")
|
|
print(f"⚠️ Invalid IP '{ip}', falling back to '0.0.0.0'.")
|
|
self.client_address = ('0.0.0.0', port)
|
|
else:
|
|
self.client_address = (ip, port)
|
|
self.logging_active = multiprocessing.Value('b', False) # Shared flag for logging
|
|
self.log_filename = multiprocessing.Array('c', 256) # Shared memory for filename
|
|
self.csv_process = None
|
|
|
|
try:
|
|
self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
self.udp_socket.bind(self.client_address)
|
|
logging.info(f"✅ Network process initialized on {self.client_address}")
|
|
except OSError as e:
|
|
logging.error(f"❌ Failed to bind to {self.client_address}: {e}")
|
|
raise
|
|
|
|
self.controller_ip_and_port = None
|
|
|
|
@staticmethod
|
|
def is_valid_ip(ip):
|
|
"""Check if an IP address is valid and can be bound on this machine."""
|
|
try:
|
|
socket.inet_aton(ip) # Validate format
|
|
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
|
|
s.bind((ip, 0)) # Try binding
|
|
return True
|
|
except (socket.error, OSError):
|
|
return False
|
|
|
|
|
|
def run(self):
|
|
"""Start the network loop."""
|
|
print("[DEBUG] Network process started.")
|
|
while not self.stop_event.is_set():
|
|
try:
|
|
self.udp_socket.settimeout(5)
|
|
data_received, self.controller_ip_and_port = self.udp_socket.recvfrom(1024)
|
|
print("Receive: ", data_received)
|
|
message = data_received.decode()
|
|
self.process_received_data(message)
|
|
print("Network :", self.send_variables)
|
|
send_xml = XMLGenerator.generate_send_xml(self.send_variables, self.config_parser.network_settings)
|
|
print("Send:", send_xml)
|
|
self.udp_socket.sendto(send_xml.encode(), self.controller_ip_and_port)
|
|
|
|
# ✅ If logging is active, write data to CSV
|
|
if self.logging_active.value:
|
|
self.log_to_csv()
|
|
|
|
except socket.timeout:
|
|
print("[WARNING] No message received within timeout period.")
|
|
except Exception as e:
|
|
print(f"[ERROR] Network process error: {e}")
|
|
|
|
def process_received_data(self, xml_string):
|
|
try:
|
|
root = ET.fromstring(xml_string)
|
|
for element in root:
|
|
if element.tag in self.receive_variables:
|
|
if len(element.attrib) > 0:
|
|
self.receive_variables[element.tag] = {k: float(v) for k, v in element.attrib.items()}
|
|
else:
|
|
self.receive_variables[element.tag] = element.text
|
|
|
|
# specifically capture IPOC from received message
|
|
if element.tag == "IPOC":
|
|
received_ipoc = int(element.text)
|
|
self.receive_variables["IPOC"] = received_ipoc
|
|
self.send_variables["IPOC"] = received_ipoc + 4 # Increment by 4 ms
|
|
except Exception as e:
|
|
print(f"[ERROR] Error parsing received message: {e}")
|
|
|
|
def log_to_csv(self):
|
|
"""Write send/receive variables to the CSV log with safety flags."""
|
|
filename = self.log_filename.value.decode().strip()
|
|
if not filename:
|
|
return
|
|
|
|
try:
|
|
with open(filename, mode="a", newline="") as file:
|
|
writer = csv.writer(file)
|
|
|
|
# Write header if the file is new
|
|
if file.tell() == 0:
|
|
headers = ["Timestamp", "IPOC"]
|
|
headers += [f"Send.{k}" for k in self.send_variables.keys()]
|
|
headers += [f"Receive.{k}" for k in self.receive_variables.keys()]
|
|
headers += ["SafetyViolation"]
|
|
writer.writerow(headers)
|
|
|
|
# Gather values
|
|
timestamp = time.strftime("%d-%m-%Y %H:%M:%S")
|
|
ipoc = self.receive_variables.get("IPOC", 0)
|
|
send_data = [self.send_variables.get(k, "") for k in self.send_variables.keys()]
|
|
receive_data = [self.receive_variables.get(k, "") for k in self.receive_variables.keys()]
|
|
|
|
# 🔴 Check for safety violations
|
|
violation = False
|
|
for var in self.send_variables:
|
|
value = self.send_variables[var]
|
|
# Check structured variables
|
|
if isinstance(value, dict):
|
|
for subkey, subval in value.items():
|
|
path = f"{var}.{subkey}"
|
|
try:
|
|
self.safety_manager.validate(path, subval)
|
|
except Exception as e:
|
|
violation = str(e)
|
|
break
|
|
else:
|
|
try:
|
|
self.safety_manager.validate(var, value)
|
|
except Exception as e:
|
|
violation = str(e)
|
|
break
|
|
|
|
writer.writerow([timestamp, ipoc] + send_data + receive_data + [violation or "False"])
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] Failed to log data to CSV: {e}")
|
|
|
|
def start_logging(self, filename):
|
|
"""Start logging RSI data to CSV."""
|
|
self.logging_active.value = True
|
|
self.log_filename.value = filename.encode()
|
|
print(f"✅ CSV Logging started: {filename}")
|
|
|
|
def stop_logging(self):
|
|
"""Stop logging RSI data."""
|
|
self.logging_active.value = False
|
|
print("🛑 CSV Logging stopped.")
|
|
|
|
def is_logging_active(self):
|
|
"""Return logging status."""
|
|
return self.logging_active.value
|