Module RelayModel.StateMonitor

Expand source code
import pickle
import threading
import time
import zmq
from RelayModel import ModuleConfig, Validation, RelayLogging
from RelayModel.SortedListNode import SortedListNodeState


class StateMonitor:
    """The StateMonitor watches the state of a simulated system, checks for validation and monitors timeouts.

    The StateMonitor is a class that can watch a system from outside and monitors the simulated nodes. It stores the
    states of the nodes and analyse it until the system gets valid. It also stores the timeout counts of each node so
    we can later analyse it. The in depth description of the StateMonitor is given in the thesis of this framework.

    Attributes:
        node_count (int): Stores the amount of nodes present in the system.
        node_timeout_count (dict): Stores the timeout counts of each node in a dict. The key of the dict is the node id
            and the value is a integer counting the timeouts executed by this node.
        nodes (dict): Stores the node states of each node in a dict. The key of this dict is the node id of the node and
            the value is a node state storing the state of this specific node.
        valid (bool): Stores the state of the system. False if not valid. True if system is valid.
        notify_nodes (set): Stores a set of node ids to check if a node should be notified after system got valid.
            The StateMonitor waits after the system got valid until every node got notified about this.
        logger (logging.Logger): Holds the logger object for this class.
        listening_thread (threading.Thread): Holds the Thread for listening for node states.
        analyse_thread (threading.Thread): Holds the Thread for analysing the system.
    """

    def __init__(self, node_count):
        """With the constructor a StateMonitor object can be created.

        It needs a integer value for creation. This value represents the amount of node in the simulated system.
        After all initial value assignments the StateMonitor starts all necessary threads. For further information see
        the thesis of this framework.

        Args:
            node_count (int): The amount of nodes present in the simulated system.
        """
        self.node_count = node_count
        self.node_timeout_count = {}
        self.nodes = {}

        self.valid = False
        self._running = True
        self._context = zmq.Context()

        self.notify_nodes = set()

        self.logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL, "StateMonitor")

        self.listening_thread = threading.Thread(target=self._start_listening_thread, daemon=True)
        self.listening_thread.start()

        self.analyse_thread = threading.Thread(target=self._start_analyse_thread, daemon=True)
        self.analyse_thread.start()

    def _start_listening_thread(self):
        listening_socket = self._context.socket(zmq.REP)
        endpoint_string = "tcp://*:" + str(ModuleConfig.STATE_MONITOR_PORT)
        listening_socket.bind(endpoint_string)
        # print("START STATE MONITOR")

        no_msg_count = 0
        while self._running:
            if (listening_socket.poll(ModuleConfig.POLL_TIMEOUT) & zmq.POLLIN) != 0:
                no_msg_count = 0
                message = listening_socket.recv_pyobj()
                if isinstance(message, SortedListNodeState):
                    self._handle_state(message)
                listening_socket.send_pyobj(self.valid)
            else:
                no_msg_count += 1

            if no_msg_count > 5 and len(self.nodes.keys()) >= self.node_count and len(self.notify_nodes) == 0:
                self.stop()
            else:
                self.logger.debug(str(self.notify_nodes))

    def _handle_state(self, node_state: SortedListNodeState):
        node_id = node_state.node_id

        if self.valid and node_id in self.notify_nodes:
            self.notify_nodes.remove(node_id)
        elif not self.valid:
            if node_id in self.notify_nodes and node_state.leaving:
                self.notify_nodes.remove(node_id)
            elif not node_state.leaving:
                self.notify_nodes.add(node_id)

        if node_id not in self.node_timeout_count:
            self.node_timeout_count[node_id] = 1
        else:
            self.node_timeout_count[node_id] += 1
        self.nodes[node_id] = node_state

    def _start_analyse_thread(self):
        while not self.valid:
            if len(self.nodes.keys()) < self.node_count:
                print("Not enough messages received! {}, {}".format(self.nodes.keys(), self.node_count))
            else:
                validation_result = Validation.system_has_valid_state(self.nodes.copy(), self.logger)
                # print("Validation result: ", validation_result)
                if validation_result:
                    print("System is valid")
                    self.valid = True
            time.sleep(0.1)
        self._write_result_message()

    def _write_result_message(self):

        file_name = str(self.node_count) + "_" + str(int(time.time()))

        f = open(ModuleConfig.RESULTS_FOLDER + file_name, "wb")

        result_list = [self.nodes, self.node_timeout_count]

        result_bytes = pickle.dumps(result_list)

        f.write(result_bytes)
        f.close()

    def stop(self):
        """Stops the StateMonitor.

        The StateMonitor gets stopped by this function by setting the internal running flag to False.
        After that all threads gets stopped.
        """
        # print("Stop State Monitor")
        self._running = False

Classes

class StateMonitor (node_count)

The StateMonitor watches the state of a simulated system, checks for validation and monitors timeouts.

The StateMonitor is a class that can watch a system from outside and monitors the simulated nodes. It stores the states of the nodes and analyse it until the system gets valid. It also stores the timeout counts of each node so we can later analyse it. The in depth description of the StateMonitor is given in the thesis of this framework.

Attributes

node_count : int
Stores the amount of nodes present in the system.
node_timeout_count : dict
Stores the timeout counts of each node in a dict. The key of the dict is the node id and the value is a integer counting the timeouts executed by this node.
nodes : dict
Stores the node states of each node in a dict. The key of this dict is the node id of the node and the value is a node state storing the state of this specific node.
valid : bool
Stores the state of the system. False if not valid. True if system is valid.
notify_nodes : set
Stores a set of node ids to check if a node should be notified after system got valid. The StateMonitor waits after the system got valid until every node got notified about this.
logger : logging.Logger
Holds the logger object for this class.
listening_thread : threading.Thread
Holds the Thread for listening for node states.
analyse_thread : threading.Thread
Holds the Thread for analysing the system.

With the constructor a StateMonitor object can be created.

It needs a integer value for creation. This value represents the amount of node in the simulated system. After all initial value assignments the StateMonitor starts all necessary threads. For further information see the thesis of this framework.

Args

node_count : int
The amount of nodes present in the simulated system.
Expand source code
class StateMonitor:
    """The StateMonitor watches the state of a simulated system, checks for validation and monitors timeouts.

    The StateMonitor is a class that can watch a system from outside and monitors the simulated nodes. It stores the
    states of the nodes and analyse it until the system gets valid. It also stores the timeout counts of each node so
    we can later analyse it. The in depth description of the StateMonitor is given in the thesis of this framework.

    Attributes:
        node_count (int): Stores the amount of nodes present in the system.
        node_timeout_count (dict): Stores the timeout counts of each node in a dict. The key of the dict is the node id
            and the value is a integer counting the timeouts executed by this node.
        nodes (dict): Stores the node states of each node in a dict. The key of this dict is the node id of the node and
            the value is a node state storing the state of this specific node.
        valid (bool): Stores the state of the system. False if not valid. True if system is valid.
        notify_nodes (set): Stores a set of node ids to check if a node should be notified after system got valid.
            The StateMonitor waits after the system got valid until every node got notified about this.
        logger (logging.Logger): Holds the logger object for this class.
        listening_thread (threading.Thread): Holds the Thread for listening for node states.
        analyse_thread (threading.Thread): Holds the Thread for analysing the system.
    """

    def __init__(self, node_count):
        """With the constructor a StateMonitor object can be created.

        It needs a integer value for creation. This value represents the amount of node in the simulated system.
        After all initial value assignments the StateMonitor starts all necessary threads. For further information see
        the thesis of this framework.

        Args:
            node_count (int): The amount of nodes present in the simulated system.
        """
        self.node_count = node_count
        self.node_timeout_count = {}
        self.nodes = {}

        self.valid = False
        self._running = True
        self._context = zmq.Context()

        self.notify_nodes = set()

        self.logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL, "StateMonitor")

        self.listening_thread = threading.Thread(target=self._start_listening_thread, daemon=True)
        self.listening_thread.start()

        self.analyse_thread = threading.Thread(target=self._start_analyse_thread, daemon=True)
        self.analyse_thread.start()

    def _start_listening_thread(self):
        listening_socket = self._context.socket(zmq.REP)
        endpoint_string = "tcp://*:" + str(ModuleConfig.STATE_MONITOR_PORT)
        listening_socket.bind(endpoint_string)
        # print("START STATE MONITOR")

        no_msg_count = 0
        while self._running:
            if (listening_socket.poll(ModuleConfig.POLL_TIMEOUT) & zmq.POLLIN) != 0:
                no_msg_count = 0
                message = listening_socket.recv_pyobj()
                if isinstance(message, SortedListNodeState):
                    self._handle_state(message)
                listening_socket.send_pyobj(self.valid)
            else:
                no_msg_count += 1

            if no_msg_count > 5 and len(self.nodes.keys()) >= self.node_count and len(self.notify_nodes) == 0:
                self.stop()
            else:
                self.logger.debug(str(self.notify_nodes))

    def _handle_state(self, node_state: SortedListNodeState):
        node_id = node_state.node_id

        if self.valid and node_id in self.notify_nodes:
            self.notify_nodes.remove(node_id)
        elif not self.valid:
            if node_id in self.notify_nodes and node_state.leaving:
                self.notify_nodes.remove(node_id)
            elif not node_state.leaving:
                self.notify_nodes.add(node_id)

        if node_id not in self.node_timeout_count:
            self.node_timeout_count[node_id] = 1
        else:
            self.node_timeout_count[node_id] += 1
        self.nodes[node_id] = node_state

    def _start_analyse_thread(self):
        while not self.valid:
            if len(self.nodes.keys()) < self.node_count:
                print("Not enough messages received! {}, {}".format(self.nodes.keys(), self.node_count))
            else:
                validation_result = Validation.system_has_valid_state(self.nodes.copy(), self.logger)
                # print("Validation result: ", validation_result)
                if validation_result:
                    print("System is valid")
                    self.valid = True
            time.sleep(0.1)
        self._write_result_message()

    def _write_result_message(self):

        file_name = str(self.node_count) + "_" + str(int(time.time()))

        f = open(ModuleConfig.RESULTS_FOLDER + file_name, "wb")

        result_list = [self.nodes, self.node_timeout_count]

        result_bytes = pickle.dumps(result_list)

        f.write(result_bytes)
        f.close()

    def stop(self):
        """Stops the StateMonitor.

        The StateMonitor gets stopped by this function by setting the internal running flag to False.
        After that all threads gets stopped.
        """
        # print("Stop State Monitor")
        self._running = False

Methods

def stop(self)

Stops the StateMonitor.

The StateMonitor gets stopped by this function by setting the internal running flag to False. After that all threads gets stopped.

Expand source code
def stop(self):
    """Stops the StateMonitor.

    The StateMonitor gets stopped by this function by setting the internal running flag to False.
    After that all threads gets stopped.
    """
    # print("Stop State Monitor")
    self._running = False