Module RelayModel.StateMonitor
Expand source code
import pickle
import threading
import time
import zmq
from RelayModel import ModuleConfig, Validation, RelayLogging
from RelayModel.SortedListNode import SortedListNodeState
class StateMonitor:
"""The StateMonitor watches the state of a simulated system, checks for validation and monitors timeouts.
The StateMonitor is a class that can watch a system from outside and monitors the simulated nodes. It stores the
states of the nodes and analyse it until the system gets valid. It also stores the timeout counts of each node so
we can later analyse it. The in depth description of the StateMonitor is given in the thesis of this framework.
Attributes:
node_count (int): Stores the amount of nodes present in the system.
node_timeout_count (dict): Stores the timeout counts of each node in a dict. The key of the dict is the node id
and the value is a integer counting the timeouts executed by this node.
nodes (dict): Stores the node states of each node in a dict. The key of this dict is the node id of the node and
the value is a node state storing the state of this specific node.
valid (bool): Stores the state of the system. False if not valid. True if system is valid.
notify_nodes (set): Stores a set of node ids to check if a node should be notified after system got valid.
The StateMonitor waits after the system got valid until every node got notified about this.
logger (logging.Logger): Holds the logger object for this class.
listening_thread (threading.Thread): Holds the Thread for listening for node states.
analyse_thread (threading.Thread): Holds the Thread for analysing the system.
"""
def __init__(self, node_count):
"""With the constructor a StateMonitor object can be created.
It needs a integer value for creation. This value represents the amount of node in the simulated system.
After all initial value assignments the StateMonitor starts all necessary threads. For further information see
the thesis of this framework.
Args:
node_count (int): The amount of nodes present in the simulated system.
"""
self.node_count = node_count
self.node_timeout_count = {}
self.nodes = {}
self.valid = False
self._running = True
self._context = zmq.Context()
self.notify_nodes = set()
self.logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL, "StateMonitor")
self.listening_thread = threading.Thread(target=self._start_listening_thread, daemon=True)
self.listening_thread.start()
self.analyse_thread = threading.Thread(target=self._start_analyse_thread, daemon=True)
self.analyse_thread.start()
def _start_listening_thread(self):
listening_socket = self._context.socket(zmq.REP)
endpoint_string = "tcp://*:" + str(ModuleConfig.STATE_MONITOR_PORT)
listening_socket.bind(endpoint_string)
# print("START STATE MONITOR")
no_msg_count = 0
while self._running:
if (listening_socket.poll(ModuleConfig.POLL_TIMEOUT) & zmq.POLLIN) != 0:
no_msg_count = 0
message = listening_socket.recv_pyobj()
if isinstance(message, SortedListNodeState):
self._handle_state(message)
listening_socket.send_pyobj(self.valid)
else:
no_msg_count += 1
if no_msg_count > 5 and len(self.nodes.keys()) >= self.node_count and len(self.notify_nodes) == 0:
self.stop()
else:
self.logger.debug(str(self.notify_nodes))
def _handle_state(self, node_state: SortedListNodeState):
node_id = node_state.node_id
if self.valid and node_id in self.notify_nodes:
self.notify_nodes.remove(node_id)
elif not self.valid:
if node_id in self.notify_nodes and node_state.leaving:
self.notify_nodes.remove(node_id)
elif not node_state.leaving:
self.notify_nodes.add(node_id)
if node_id not in self.node_timeout_count:
self.node_timeout_count[node_id] = 1
else:
self.node_timeout_count[node_id] += 1
self.nodes[node_id] = node_state
def _start_analyse_thread(self):
while not self.valid:
if len(self.nodes.keys()) < self.node_count:
print("Not enough messages received! {}, {}".format(self.nodes.keys(), self.node_count))
else:
validation_result = Validation.system_has_valid_state(self.nodes.copy(), self.logger)
# print("Validation result: ", validation_result)
if validation_result:
print("System is valid")
self.valid = True
time.sleep(0.1)
self._write_result_message()
def _write_result_message(self):
file_name = str(self.node_count) + "_" + str(int(time.time()))
f = open(ModuleConfig.RESULTS_FOLDER + file_name, "wb")
result_list = [self.nodes, self.node_timeout_count]
result_bytes = pickle.dumps(result_list)
f.write(result_bytes)
f.close()
def stop(self):
"""Stops the StateMonitor.
The StateMonitor gets stopped by this function by setting the internal running flag to False.
After that all threads gets stopped.
"""
# print("Stop State Monitor")
self._running = False
Classes
class StateMonitor (node_count)-
The StateMonitor watches the state of a simulated system, checks for validation and monitors timeouts.
The StateMonitor is a class that can watch a system from outside and monitors the simulated nodes. It stores the states of the nodes and analyse it until the system gets valid. It also stores the timeout counts of each node so we can later analyse it. The in depth description of the StateMonitor is given in the thesis of this framework.
Attributes
node_count:int- Stores the amount of nodes present in the system.
node_timeout_count:dict- Stores the timeout counts of each node in a dict. The key of the dict is the node id and the value is a integer counting the timeouts executed by this node.
nodes:dict- Stores the node states of each node in a dict. The key of this dict is the node id of the node and the value is a node state storing the state of this specific node.
valid:bool- Stores the state of the system. False if not valid. True if system is valid.
notify_nodes:set- Stores a set of node ids to check if a node should be notified after system got valid. The StateMonitor waits after the system got valid until every node got notified about this.
logger:logging.Logger- Holds the logger object for this class.
listening_thread:threading.Thread- Holds the Thread for listening for node states.
analyse_thread:threading.Thread- Holds the Thread for analysing the system.
With the constructor a StateMonitor object can be created.
It needs a integer value for creation. This value represents the amount of node in the simulated system. After all initial value assignments the StateMonitor starts all necessary threads. For further information see the thesis of this framework.
Args
node_count:int- The amount of nodes present in the simulated system.
Expand source code
class StateMonitor: """The StateMonitor watches the state of a simulated system, checks for validation and monitors timeouts. The StateMonitor is a class that can watch a system from outside and monitors the simulated nodes. It stores the states of the nodes and analyse it until the system gets valid. It also stores the timeout counts of each node so we can later analyse it. The in depth description of the StateMonitor is given in the thesis of this framework. Attributes: node_count (int): Stores the amount of nodes present in the system. node_timeout_count (dict): Stores the timeout counts of each node in a dict. The key of the dict is the node id and the value is a integer counting the timeouts executed by this node. nodes (dict): Stores the node states of each node in a dict. The key of this dict is the node id of the node and the value is a node state storing the state of this specific node. valid (bool): Stores the state of the system. False if not valid. True if system is valid. notify_nodes (set): Stores a set of node ids to check if a node should be notified after system got valid. The StateMonitor waits after the system got valid until every node got notified about this. logger (logging.Logger): Holds the logger object for this class. listening_thread (threading.Thread): Holds the Thread for listening for node states. analyse_thread (threading.Thread): Holds the Thread for analysing the system. """ def __init__(self, node_count): """With the constructor a StateMonitor object can be created. It needs a integer value for creation. This value represents the amount of node in the simulated system. After all initial value assignments the StateMonitor starts all necessary threads. For further information see the thesis of this framework. Args: node_count (int): The amount of nodes present in the simulated system. """ self.node_count = node_count self.node_timeout_count = {} self.nodes = {} self.valid = False self._running = True self._context = zmq.Context() self.notify_nodes = set() self.logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL, "StateMonitor") self.listening_thread = threading.Thread(target=self._start_listening_thread, daemon=True) self.listening_thread.start() self.analyse_thread = threading.Thread(target=self._start_analyse_thread, daemon=True) self.analyse_thread.start() def _start_listening_thread(self): listening_socket = self._context.socket(zmq.REP) endpoint_string = "tcp://*:" + str(ModuleConfig.STATE_MONITOR_PORT) listening_socket.bind(endpoint_string) # print("START STATE MONITOR") no_msg_count = 0 while self._running: if (listening_socket.poll(ModuleConfig.POLL_TIMEOUT) & zmq.POLLIN) != 0: no_msg_count = 0 message = listening_socket.recv_pyobj() if isinstance(message, SortedListNodeState): self._handle_state(message) listening_socket.send_pyobj(self.valid) else: no_msg_count += 1 if no_msg_count > 5 and len(self.nodes.keys()) >= self.node_count and len(self.notify_nodes) == 0: self.stop() else: self.logger.debug(str(self.notify_nodes)) def _handle_state(self, node_state: SortedListNodeState): node_id = node_state.node_id if self.valid and node_id in self.notify_nodes: self.notify_nodes.remove(node_id) elif not self.valid: if node_id in self.notify_nodes and node_state.leaving: self.notify_nodes.remove(node_id) elif not node_state.leaving: self.notify_nodes.add(node_id) if node_id not in self.node_timeout_count: self.node_timeout_count[node_id] = 1 else: self.node_timeout_count[node_id] += 1 self.nodes[node_id] = node_state def _start_analyse_thread(self): while not self.valid: if len(self.nodes.keys()) < self.node_count: print("Not enough messages received! {}, {}".format(self.nodes.keys(), self.node_count)) else: validation_result = Validation.system_has_valid_state(self.nodes.copy(), self.logger) # print("Validation result: ", validation_result) if validation_result: print("System is valid") self.valid = True time.sleep(0.1) self._write_result_message() def _write_result_message(self): file_name = str(self.node_count) + "_" + str(int(time.time())) f = open(ModuleConfig.RESULTS_FOLDER + file_name, "wb") result_list = [self.nodes, self.node_timeout_count] result_bytes = pickle.dumps(result_list) f.write(result_bytes) f.close() def stop(self): """Stops the StateMonitor. The StateMonitor gets stopped by this function by setting the internal running flag to False. After that all threads gets stopped. """ # print("Stop State Monitor") self._running = FalseMethods
def stop(self)-
Stops the StateMonitor.
The StateMonitor gets stopped by this function by setting the internal running flag to False. After that all threads gets stopped.
Expand source code
def stop(self): """Stops the StateMonitor. The StateMonitor gets stopped by this function by setting the internal running flag to False. After that all threads gets stopped. """ # print("Stop State Monitor") self._running = False