Module RelayModel.Node

Expand source code
import multiprocessing
import queue
import threading
import time

import zmq

from RelayModel import ModuleConfig, RelayLogging
from RelayModel.Communication import Action
from RelayModel.RelayId import RelayLayerId, RelayId
from RelayModel.RelayLayer import RelayLayer


class Node:
    """The Node class represents a Node in a distributed System.

    The Node class is the main class for creating distributed protocols with the relay model.
    This class implements functionality to work with the relay model and leave a system without harming its
    connectivity. It implements the protocol provided by Setzer in his phd thesis:
    (https://digital.ub.uni-paderborn.de/urn/urn:nbn:de:hbz:466:2-37849).

    Attributes:
        buffer (multiprocessing.Queue): Defines a buffer for the node where actions can be inserted by the RelayLayer.
            Every action in this buffer gets processed by the node.
        relay_layer (RelayLayer): Stores a RelayLayer object for this node.
        running (bool): Stores the state of the node. If True the node is running if False the node is not running and
            all threads are stopped.
        timeout_thread (threading.Thread): Stores the timeout thread where timeout action were inserted in the message
            buffer.
        message_thread (threading.Thread): Stores the message thread of the node. The thread periodically checks if
            there is a new Action in the node_buffer and executes the Action.
        logger (logging.Logger): Stores the Logger class of the Node.
        timeout_period (float): Stores the period of the timeout execution. By default this is set to the value set in
            the ModuleConfig.
        leaving (bool): Stores the leaving state of the node. When set to True the Node changes its behaviour where it
            tries to leave the system by the protocol provided by Setzer.
        in_ref (RelayId): Stores a relay id to the main sink relay of this node.
        N (list): Stores a list which is a pseudo variable for all relays stored in an underlying protocol.
        D (list): Stores a list of RelayIds which should be deleted in the protocol.
        a_out (RelayId): Stores the RelayId of the outgoing ancher.
        a_in (RelayId): Stores the RelayId of the incoming ancher.
        analyse_mode (bool): Handles the analyse mode of the node. When set to True the Node is executed in analyse mode
            and sends every timeout a state to a StateMonitor. By default this is set to False and can be activated
            either in the constructor or by changing the attribute.
    """

    def __init__(self, node_id: int, ip: str, port: int, analyse_mode=False):
        """The constructor creates a Node object with given node id, ip and port.

        When creating a Node object the constructor initializes a RelayLayer and creates all necessary threads.
        All threads are not started until the `start` method is called.

        Args:
            node_id (int): Defines the node id of a node to form a topology.
            ip (str): Defines the ip address where the node should be reachable on.
            port (int): Defines the port where the node should be reachable on.
            analyse_mode (bool, optional): Optionally the analyse mode can be turned on while creating the node. By
                default this parameter is set to False.
        """

        self.buffer = multiprocessing.Queue()

        layer_id = RelayLayerId(ip, port)
        layer_id.node_id = node_id

        self.relay_layer = RelayLayer(layer_id, self.buffer)

        self.running = True
        self.timeout_thread = threading.Thread(target=self._start_timeout_thread, daemon=True)
        self.message_thread = threading.Thread(target=self._start_message_handling, daemon=True)

        self.logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL, "node_{}_{}".format(ip, port))

        self.timeout_period = ModuleConfig.NODE_TIMEOUT_PERIOD

        self.leaving = False

        self.in_ref = None

        self.N = []

        self.D = []

        self.a_out = None

        self.a_in = None

        self.analyse_mode = analyse_mode

        self._context = zmq.Context()

        self.logger.debug("Start node")

    def _start_message_handling(self):
        while self.running:
            try:
                message = self.buffer.get(False, 0.1)
                self.logger.debug("buffer size: {}".format(self.buffer.qsize()))
                if isinstance(message, Action):
                    self.replace_action(message)
                else:
                    self.logger.error("No supported class found for message {}".format(message))
            except queue.Empty:
                pass
        self.logger.debug("Stopped Node Thread")

    def _start_timeout_thread(self):
        while self.running:
            action = Action('timeout', [])
            self.buffer.put(action)
            # self.timeout()
            time.sleep(self.timeout_period)
        self.logger.debug("Stopped Timeout Thread")

    def timeout(self):
        """The Timeout method is called periodically and corrects all values in the node.

        The method is correcting every value in the node or if the node is leaving it will prepare the node for leaving.
        For further information about the protocol see the thesis of this paper.

        """
        self.logger.debug("in_ref " + str(self.in_ref))
        self.logger.debug('a_in ' + str(self.a_in))
        self.logger.debug('a_out ' + str(self.a_out))

        for relay in self.get_relays_from_original_variables():
            if not self.relay_layer.check_relay_exists(relay):
                self.remove_from_original_variables(relay)

        if self.in_ref is not None and not self.relay_layer.check_relay_exists(self.in_ref):
            self.in_ref = None
        if self.a_out is not None and not self.relay_layer.check_relay_exists(self.a_out):
            self.a_out = None
        if self.a_in is not None and not self.relay_layer.check_relay_exists(self.a_in):
            self.a_in = None

        validated_relays = self.relay_layer.get_validated_relays()

        for relay in validated_relays:
            if relay not in self.D and not self.check_in_original_variables(relay) and relay != self.in_ref \
                    and relay != self.a_in and relay != self.a_out:
                self.D.append(relay)
        if self.in_ref is None or not self.relay_layer.is_sink(self.in_ref):
            if self.in_ref is not None:
                self.D.append(self.in_ref)
            self.in_ref = self.relay_layer.new_relay()

        if not self.leaving:

            for relay in self.get_relays_from_original_variables():
                # if not self.call_relay_layer_method("check_direct", True, (relay,)):
                if not self.relay_layer.is_direct(relay):
                    self.remove_from_original_variables(relay)
                    self.D.append(relay)

            if self.a_in is not None:
                self.D.append(self.a_in)
                self.a_in = None

            if self.a_out is not None:
                self.D.append(self.a_out)
                self.a_out = None

            for relay in self.D.copy():
                if not self.relay_layer.has_incoming(relay):
                    self.D.remove(relay)
                    self.reversal_of_relay(relay)
                    self.relay_layer.delete(relay)
            time_start = time.time()
            self.original_timeout()
            self.logger.debug("Orig timeout time: {}".format(time.time() - time_start))

        else:
            for relay in self.get_relays_from_original_variables():
                self.remove_from_original_variables(relay)
                self.D.append(relay)

            if self.a_in is not None and not self.relay_layer.is_sink(self.a_in):
                self.D.append(self.a_in)
                self.a_in = self.relay_layer.new_relay()

            if self.a_in is None:
                self.a_in = self.relay_layer.new_relay()

            if self.a_out is not None:
                if self.relay_layer.has_incoming(self.a_out) or not self.relay_layer.is_direct(self.a_out)\
                        or self.relay_layer.is_sink(self.a_out):
                    self.D.append(self.a_out)
                    self.a_out = None

            for relay in self.D.copy():
                if not self.relay_layer.has_incoming(relay) and relay != self.a_out:
                    action = Action('ask_to_reverse', [self.in_ref])
                    self.relay_layer.send(relay, action)
                    self.D.remove(relay)
                    self.logger.debug("Delete relay: {}".format(relay))
                    self.relay_layer.delete(relay)

            if len(self.D) == 0 and not self.relay_layer.has_incoming(self.in_ref) \
                    and not self.relay_layer.has_incoming(self.a_in):
                self.shutdown()
                return
            self.logger.debug("LEAVING CHECK: {} {} {}".format(self.node_id, len(self.D),
                                                               self.relay_layer.has_incoming(self.in_ref),
                                                               self.relay_layer.has_incoming(self.a_in)))

            if self.a_out is not None:
                if not self.relay_layer.has_incoming(self.a_in):
                    action = Action("ask_to_reverse_anchor", [self.a_in])
                    self.relay_layer.send(self.a_out, action)

                action = Action("notify_anchor", [])
                self.relay_layer.send(self.a_out, action)
        if self.analyse_mode:
            # Send local state to monitor
            done = self.send_analyse_state()
            if done:
                self.shutdown()

    def original_timeout(self):
        """Can be overridden to implement a underlying protocol timeout.

        This method is automatically executed on the end of every timeout of this node class.
        It should be implemented with the timeout of the underlying protocol.

        Example:
            See `SortedListNode`
        """
        pass

    def reversal_of_relay(self, relay_id: RelayId):
        """Should be overridden to reverse a specific relay reference.

        The method should just send a node action from the underlying protocol over the given relay with the in_ref as
        parameter.

        Example:
            See `SortedListNode`.
            Example for the linearize Action

                self.call_method(relay_id, 'linearize', [self.in_ref])

        """
        pass

    def check_in_original_variables(self, relay_id: RelayId):
        """Should be overridden to check if a specific relay is in one of the underlying protocol variables.

        By default it checks if the relay is in the attribute N. This should be changed to a better implementation when
        implementing a underlying protocol.

        Example:
            See `SortedListNode` implementation.

        Args:
            relay_id (RelayId): Defines the RelayId that should be checked.
        """
        return relay_id in self.N

    def remove_from_original_variables(self, relay_id: RelayId):
        """Should be overridden to remove relays from underlying protocol variables.

        By default it removes the given relay id from the N attribute. This should be changed to a better implementation
        when implementing a underlying protocol.

        Example:
            See `SortedListNode` implementation.

        Args:
            relay_id (RelayId): Defines the RelayId that should be removed from the variables.
        """
        self.N.remove(relay_id)

    def get_relays_from_original_variables(self):
        """Should be overridden to get all relays from underlying protocol variables.

        This normally just returns the N attribute list. This should be changed to a better implementation when
        implementing a underlying protocol. It should return a list of RelayIds that are stored in any variable define
        in the protocol.

        Example:
            See `SortedListNode` implementation.

        Returns:
            list: A list of RelayIds containing in variables of the protocol.
        """
        return self.N

    def send_analyse_state(self):
        """Should be overridden to send a state to a StateMonitor.

        In this method a State object should be created. After that it should be sent with the `send_state_to_monitor`
        method.

        Example:
            See `SortedListNode` implementation.
        """
        pass

    def replace_action(self, action):
        """This method is called whenever the node received a Action.

        The method executes the method defined in the received action if the node is staying or reverses the connection
        if the node is leaving.

        Args:
            action (Action): The received action that should be executed.
        """
        # self.logger.debug(f"Calling Replace_action with {action}")
        action_name = action.action_type.lower()
        if not self.leaving or action_name == "timeout" or action_name == "reverse" or action_name == "ask_to_reverse" \
                or action_name == "ask_to_reverse_anchor" or action_name == "notify_anchor":
            action_parameters = action.parameters
            if action_name == "ask_to_reverse_anchor":
                action_parameters.append(action.receiving_relay)
            try:
                action_function = getattr(self, action_name)
            except AttributeError:
                self.logger.warning("Action with action type {} not found!".format(action_name))
            else:
                if callable(action_function):
                    action_function(*action_parameters)
                else:
                    self.logger.warning("Action with action type {} is not callable or a function!".format(action_name))

        else:
            action_parameters = action.parameters
            receiving_relay = action.receiving_relay
            if receiving_relay is None:
                self.logger.warning("Receiving relay was empty for action {}".format(action))
            else:
                for parameter in action_parameters:
                    if isinstance(parameter, RelayId):
                        action = Action('ask_to_reverse', [receiving_relay])
                        self.relay_layer.send(parameter, action)
                        self.relay_layer.delete(parameter)

    def ask_to_reverse(self, out: RelayId):
        """Implements the ask_to_reverse protocol method.

        For further information see the phd thesis.

        Args:
            out (RelayId): Defines the RelayId that should be reversed.
        """
        self.logger.debug("Calling ask_to_reverse with {}".format(out))
        if not self.leaving:
            for v in self.get_relays_from_original_variables():
                if self.relay_layer.same_target(out, v):
                    self.remove_from_original_variables(v)
                    self.D.append(v)
            action = Action("reverse", [self.in_ref])
            # self.call_relay_layer_method("send", True, [out, action])
            # self.call_relay_layer_method("delete", False, [out])
            self.relay_layer.send(out, action)
            self.relay_layer.delete(out)
        else:
            if self.a_out is None:
                action = Action('ask_to_reverse', [self.in_ref])
                # self.call_relay_layer_method("send", True, [out, action])
                # self.call_relay_layer_method("delete", False, [out])
                self.relay_layer.send(out, action)
                self.relay_layer.delete(out)
            else:
                if self.relay_layer.same_target(out, self.a_out):
                    # new_relay = self.call_relay_layer_method("merge", True, [[out, self.a_out]])
                    new_relay = self.relay_layer.merge([self.a_out, out])
                    if new_relay:
                        self.a_out = None
                        action = Action('ask_to_reverse', [self.in_ref])
                        # self.call_relay_layer_method("send", True, [new_relay, action])
                        self.relay_layer.send(new_relay, action)
                    # else:
                    #     self.logger.warning("While merge {} \nand {}\n something went wrong".format(out, self.a_out))
                else:
                    action = Action('reverse', [self.a_out])
                    # self.call_relay_layer_method("send", True, [out, action])
                    # self.call_relay_layer_method("delete", False, [out])
                    self.relay_layer.send(out, action)
                    self.relay_layer.delete(out)

    def ask_to_reverse_anchor(self, out: RelayId, receiving_relay: RelayId):
        """Implements the ask_to_reverse_anchor protocol method.

        For further information see the phd thesis.

        Args:
            out (RelayId): Defines a Relay.
            receiving_relay (RelayId): Defines the relay that received this action.
        """
        self.logger.debug("Calling ask_to_reverse_anchor with {}\n {}".format(out, receiving_relay))
        if not self.leaving:
            action = Action('reverse', [self.in_ref])
            # self.call_relay_layer_method("send", True, [out, action])
            # self.call_relay_layer_method("delete", False, [out])
            self.relay_layer.send(out, action)
            self.relay_layer.delete(out)
        else:
            if receiving_relay is None:
                self.logger.warning("Receiving relay was empty for action ask to reverse anchor")
            else:
                action = Action('ask_to_reverse', [receiving_relay])
                # self.call_relay_layer_method("send", True, [out, action])
                self.relay_layer.send(out, action)
                self.relay_layer.delete(out)

    def notify_anchor(self):
        """Implements the notify_anchor protocol method.

        For further information see the phd thesis.
        """
        self.logger.debug("Calling notify anchor")
        if self.leaving:
            if self.a_in is not None:
                self.D.append(self.a_in)
            # self.a_in = self.call_relay_layer_method("new_relay", True, [True])
            self.a_in = self.relay_layer.new_relay()

    def reverse(self, out: RelayId):
        """Implements the reverse protocol method.

        For further information see the phd thesis.

        Args:
            out (RelayId): Defines a Relay.
        """
        self.logger.debug("Calling reverse with {}".format(out))
        if not self.leaving:
            self.reversal_of_relay(out)
            # self.call_relay_layer_method("delete", False, [out])
            self.relay_layer.delete(out)
        else:
            if self.a_out is None:
                if self.relay_layer.is_dead(out):
                    print("Reverse relay is not alive {}".format(out))
                if self.relay_layer.is_direct(out):
                    self.a_out = out
                    # print(f"OUT: {self.a_out}")
                else:
                    # Ask out to send a direct relay
                    action = Action('ask_to_reverse', [self.in_ref])
                    # self.call_relay_layer_method("send", True, [out, action])
                    # self.call_relay_layer_method("delete", False, [out])
                    self.relay_layer.send(out, action)
                    self.relay_layer.delete(out)
            else:
                if self.relay_layer.same_target(out, self.a_out):
                    start_time = time.time()
                    # merged = self.call_relay_layer_method("merge", True, [[a_out, out]])
                    merged = self.relay_layer.merge([self.a_out, out])
                    if merged:
                        self.a_out = merged
                else:
                    # print("not same target")
                    action = Action('ask_to_reverse', [self.a_out])
                    # self.call_relay_layer_method("send", True, [out, action])
                    # self.call_relay_layer_method("delete", False, [out])
                    self.relay_layer.send(out, action)
                    self.relay_layer.delete(out)

    def call_method(self, relay_id: RelayId, method: str, parameters: list):
        """Calls a method of another node connected with the given relay.

        This method calls the given method with the given parameters in the node that handles the sink relay that is
        connected with the relay defined by the given relay id.
        It calls the send method from the RelayLayer to send the method to the node.

        Args:
            relay_id (RelayId): Defines the Relay which is used to send the action.
            method (str): Defines the name of the method that should be executed in the other node.
            parameters (list): Defines a list of parameters used to execute the method.
        """
        action = Action(method, parameters)

        # self.call_relay_layer_method("send", True, [relay, action])
        result = self.relay_layer.send(relay_id, action)
        if not result:
            self.logger.error("SEND CALL FAILED FOR RELAY {}".format(relay_id))

    def start(self):
        """Starts the node by starting all threads.

        The node gets started by starting the message thread and the timeout thread.
        """
        self.logger.debug("Start node in start")
        self.message_thread.start()
        self.timeout_thread.start()

    def send_state_to_monitor(self, state):
        """Sends the given state to the StateMonitor.

        The method sends the given state to the address defined in the ModuleConfig.

        Args:
            state (Object): Defines the state that should be sent. This should be a object of a state class.
        """
        socket = self._context.socket(zmq.REQ)

        end_point = "tcp://" + ModuleConfig.STATE_MONITOR_ADDRESS

        socket.connect(end_point)
        socket.send_pyobj(state)

        reply = socket.recv_pyobj()
        socket.setsockopt(zmq.LINGER, 0)
        socket.close()
        return reply

    def shutdown(self):
        """Completely shutdown the node.

        The method stops all threads by setting the running state to False.
        After that it stops the RelayLayer and waits until the RelayLayer deleted all Relays.
        """
        # self.logger.warning("Stopping")
        self.running = False
        # print(time.time() - self.start_time)
        if self.analyse_mode:
            self.send_analyse_state()

        self.relay_layer.stop()
        self.relay_layer.timeout_thread.join()
        self.timeout_thread.join()
        self.buffer.close()
        self.buffer.join_thread()
        # self.relay_layer.shutdown()

    def stop(self):
        """Stopping the node so that the node should leave the system.

        The method sets the leaving state to True and activates the leaving protocol of the node.
        """
        self.logger.debug("Leaving the system")
        self.leaving = True

Classes

class Node (node_id: int, ip: str, port: int, analyse_mode=False)

The Node class represents a Node in a distributed System.

The Node class is the main class for creating distributed protocols with the relay model. This class implements functionality to work with the relay model and leave a system without harming its connectivity. It implements the protocol provided by Setzer in his phd thesis: (https://digital.ub.uni-paderborn.de/urn/urn:nbn:de:hbz:466:2-37849).

Attributes

buffer : multiprocessing.Queue
Defines a buffer for the node where actions can be inserted by the RelayLayer. Every action in this buffer gets processed by the node.
relay_layer : RelayLayer
Stores a RelayLayer object for this node.
running : bool
Stores the state of the node. If True the node is running if False the node is not running and all threads are stopped.
timeout_thread : threading.Thread
Stores the timeout thread where timeout action were inserted in the message buffer.
message_thread : threading.Thread
Stores the message thread of the node. The thread periodically checks if there is a new Action in the node_buffer and executes the Action.
logger : logging.Logger
Stores the Logger class of the Node.
timeout_period : float
Stores the period of the timeout execution. By default this is set to the value set in the ModuleConfig.
leaving : bool
Stores the leaving state of the node. When set to True the Node changes its behaviour where it tries to leave the system by the protocol provided by Setzer.
in_ref : RelayId
Stores a relay id to the main sink relay of this node.
N : list
Stores a list which is a pseudo variable for all relays stored in an underlying protocol.
D : list
Stores a list of RelayIds which should be deleted in the protocol.
a_out : RelayId
Stores the RelayId of the outgoing ancher.
a_in : RelayId
Stores the RelayId of the incoming ancher.
analyse_mode : bool
Handles the analyse mode of the node. When set to True the Node is executed in analyse mode and sends every timeout a state to a StateMonitor. By default this is set to False and can be activated either in the constructor or by changing the attribute.

The constructor creates a Node object with given node id, ip and port.

When creating a Node object the constructor initializes a RelayLayer and creates all necessary threads. All threads are not started until the start method is called.

Args

node_id : int
Defines the node id of a node to form a topology.
ip : str
Defines the ip address where the node should be reachable on.
port : int
Defines the port where the node should be reachable on.
analyse_mode : bool, optional
Optionally the analyse mode can be turned on while creating the node. By default this parameter is set to False.
Expand source code
class Node:
    """The Node class represents a Node in a distributed System.

    The Node class is the main class for creating distributed protocols with the relay model.
    This class implements functionality to work with the relay model and leave a system without harming its
    connectivity. It implements the protocol provided by Setzer in his phd thesis:
    (https://digital.ub.uni-paderborn.de/urn/urn:nbn:de:hbz:466:2-37849).

    Attributes:
        buffer (multiprocessing.Queue): Defines a buffer for the node where actions can be inserted by the RelayLayer.
            Every action in this buffer gets processed by the node.
        relay_layer (RelayLayer): Stores a RelayLayer object for this node.
        running (bool): Stores the state of the node. If True the node is running if False the node is not running and
            all threads are stopped.
        timeout_thread (threading.Thread): Stores the timeout thread where timeout action were inserted in the message
            buffer.
        message_thread (threading.Thread): Stores the message thread of the node. The thread periodically checks if
            there is a new Action in the node_buffer and executes the Action.
        logger (logging.Logger): Stores the Logger class of the Node.
        timeout_period (float): Stores the period of the timeout execution. By default this is set to the value set in
            the ModuleConfig.
        leaving (bool): Stores the leaving state of the node. When set to True the Node changes its behaviour where it
            tries to leave the system by the protocol provided by Setzer.
        in_ref (RelayId): Stores a relay id to the main sink relay of this node.
        N (list): Stores a list which is a pseudo variable for all relays stored in an underlying protocol.
        D (list): Stores a list of RelayIds which should be deleted in the protocol.
        a_out (RelayId): Stores the RelayId of the outgoing ancher.
        a_in (RelayId): Stores the RelayId of the incoming ancher.
        analyse_mode (bool): Handles the analyse mode of the node. When set to True the Node is executed in analyse mode
            and sends every timeout a state to a StateMonitor. By default this is set to False and can be activated
            either in the constructor or by changing the attribute.
    """

    def __init__(self, node_id: int, ip: str, port: int, analyse_mode=False):
        """The constructor creates a Node object with given node id, ip and port.

        When creating a Node object the constructor initializes a RelayLayer and creates all necessary threads.
        All threads are not started until the `start` method is called.

        Args:
            node_id (int): Defines the node id of a node to form a topology.
            ip (str): Defines the ip address where the node should be reachable on.
            port (int): Defines the port where the node should be reachable on.
            analyse_mode (bool, optional): Optionally the analyse mode can be turned on while creating the node. By
                default this parameter is set to False.
        """

        self.buffer = multiprocessing.Queue()

        layer_id = RelayLayerId(ip, port)
        layer_id.node_id = node_id

        self.relay_layer = RelayLayer(layer_id, self.buffer)

        self.running = True
        self.timeout_thread = threading.Thread(target=self._start_timeout_thread, daemon=True)
        self.message_thread = threading.Thread(target=self._start_message_handling, daemon=True)

        self.logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL, "node_{}_{}".format(ip, port))

        self.timeout_period = ModuleConfig.NODE_TIMEOUT_PERIOD

        self.leaving = False

        self.in_ref = None

        self.N = []

        self.D = []

        self.a_out = None

        self.a_in = None

        self.analyse_mode = analyse_mode

        self._context = zmq.Context()

        self.logger.debug("Start node")

    def _start_message_handling(self):
        while self.running:
            try:
                message = self.buffer.get(False, 0.1)
                self.logger.debug("buffer size: {}".format(self.buffer.qsize()))
                if isinstance(message, Action):
                    self.replace_action(message)
                else:
                    self.logger.error("No supported class found for message {}".format(message))
            except queue.Empty:
                pass
        self.logger.debug("Stopped Node Thread")

    def _start_timeout_thread(self):
        while self.running:
            action = Action('timeout', [])
            self.buffer.put(action)
            # self.timeout()
            time.sleep(self.timeout_period)
        self.logger.debug("Stopped Timeout Thread")

    def timeout(self):
        """The Timeout method is called periodically and corrects all values in the node.

        The method is correcting every value in the node or if the node is leaving it will prepare the node for leaving.
        For further information about the protocol see the thesis of this paper.

        """
        self.logger.debug("in_ref " + str(self.in_ref))
        self.logger.debug('a_in ' + str(self.a_in))
        self.logger.debug('a_out ' + str(self.a_out))

        for relay in self.get_relays_from_original_variables():
            if not self.relay_layer.check_relay_exists(relay):
                self.remove_from_original_variables(relay)

        if self.in_ref is not None and not self.relay_layer.check_relay_exists(self.in_ref):
            self.in_ref = None
        if self.a_out is not None and not self.relay_layer.check_relay_exists(self.a_out):
            self.a_out = None
        if self.a_in is not None and not self.relay_layer.check_relay_exists(self.a_in):
            self.a_in = None

        validated_relays = self.relay_layer.get_validated_relays()

        for relay in validated_relays:
            if relay not in self.D and not self.check_in_original_variables(relay) and relay != self.in_ref \
                    and relay != self.a_in and relay != self.a_out:
                self.D.append(relay)
        if self.in_ref is None or not self.relay_layer.is_sink(self.in_ref):
            if self.in_ref is not None:
                self.D.append(self.in_ref)
            self.in_ref = self.relay_layer.new_relay()

        if not self.leaving:

            for relay in self.get_relays_from_original_variables():
                # if not self.call_relay_layer_method("check_direct", True, (relay,)):
                if not self.relay_layer.is_direct(relay):
                    self.remove_from_original_variables(relay)
                    self.D.append(relay)

            if self.a_in is not None:
                self.D.append(self.a_in)
                self.a_in = None

            if self.a_out is not None:
                self.D.append(self.a_out)
                self.a_out = None

            for relay in self.D.copy():
                if not self.relay_layer.has_incoming(relay):
                    self.D.remove(relay)
                    self.reversal_of_relay(relay)
                    self.relay_layer.delete(relay)
            time_start = time.time()
            self.original_timeout()
            self.logger.debug("Orig timeout time: {}".format(time.time() - time_start))

        else:
            for relay in self.get_relays_from_original_variables():
                self.remove_from_original_variables(relay)
                self.D.append(relay)

            if self.a_in is not None and not self.relay_layer.is_sink(self.a_in):
                self.D.append(self.a_in)
                self.a_in = self.relay_layer.new_relay()

            if self.a_in is None:
                self.a_in = self.relay_layer.new_relay()

            if self.a_out is not None:
                if self.relay_layer.has_incoming(self.a_out) or not self.relay_layer.is_direct(self.a_out)\
                        or self.relay_layer.is_sink(self.a_out):
                    self.D.append(self.a_out)
                    self.a_out = None

            for relay in self.D.copy():
                if not self.relay_layer.has_incoming(relay) and relay != self.a_out:
                    action = Action('ask_to_reverse', [self.in_ref])
                    self.relay_layer.send(relay, action)
                    self.D.remove(relay)
                    self.logger.debug("Delete relay: {}".format(relay))
                    self.relay_layer.delete(relay)

            if len(self.D) == 0 and not self.relay_layer.has_incoming(self.in_ref) \
                    and not self.relay_layer.has_incoming(self.a_in):
                self.shutdown()
                return
            self.logger.debug("LEAVING CHECK: {} {} {}".format(self.node_id, len(self.D),
                                                               self.relay_layer.has_incoming(self.in_ref),
                                                               self.relay_layer.has_incoming(self.a_in)))

            if self.a_out is not None:
                if not self.relay_layer.has_incoming(self.a_in):
                    action = Action("ask_to_reverse_anchor", [self.a_in])
                    self.relay_layer.send(self.a_out, action)

                action = Action("notify_anchor", [])
                self.relay_layer.send(self.a_out, action)
        if self.analyse_mode:
            # Send local state to monitor
            done = self.send_analyse_state()
            if done:
                self.shutdown()

    def original_timeout(self):
        """Can be overridden to implement a underlying protocol timeout.

        This method is automatically executed on the end of every timeout of this node class.
        It should be implemented with the timeout of the underlying protocol.

        Example:
            See `SortedListNode`
        """
        pass

    def reversal_of_relay(self, relay_id: RelayId):
        """Should be overridden to reverse a specific relay reference.

        The method should just send a node action from the underlying protocol over the given relay with the in_ref as
        parameter.

        Example:
            See `SortedListNode`.
            Example for the linearize Action

                self.call_method(relay_id, 'linearize', [self.in_ref])

        """
        pass

    def check_in_original_variables(self, relay_id: RelayId):
        """Should be overridden to check if a specific relay is in one of the underlying protocol variables.

        By default it checks if the relay is in the attribute N. This should be changed to a better implementation when
        implementing a underlying protocol.

        Example:
            See `SortedListNode` implementation.

        Args:
            relay_id (RelayId): Defines the RelayId that should be checked.
        """
        return relay_id in self.N

    def remove_from_original_variables(self, relay_id: RelayId):
        """Should be overridden to remove relays from underlying protocol variables.

        By default it removes the given relay id from the N attribute. This should be changed to a better implementation
        when implementing a underlying protocol.

        Example:
            See `SortedListNode` implementation.

        Args:
            relay_id (RelayId): Defines the RelayId that should be removed from the variables.
        """
        self.N.remove(relay_id)

    def get_relays_from_original_variables(self):
        """Should be overridden to get all relays from underlying protocol variables.

        This normally just returns the N attribute list. This should be changed to a better implementation when
        implementing a underlying protocol. It should return a list of RelayIds that are stored in any variable define
        in the protocol.

        Example:
            See `SortedListNode` implementation.

        Returns:
            list: A list of RelayIds containing in variables of the protocol.
        """
        return self.N

    def send_analyse_state(self):
        """Should be overridden to send a state to a StateMonitor.

        In this method a State object should be created. After that it should be sent with the `send_state_to_monitor`
        method.

        Example:
            See `SortedListNode` implementation.
        """
        pass

    def replace_action(self, action):
        """This method is called whenever the node received a Action.

        The method executes the method defined in the received action if the node is staying or reverses the connection
        if the node is leaving.

        Args:
            action (Action): The received action that should be executed.
        """
        # self.logger.debug(f"Calling Replace_action with {action}")
        action_name = action.action_type.lower()
        if not self.leaving or action_name == "timeout" or action_name == "reverse" or action_name == "ask_to_reverse" \
                or action_name == "ask_to_reverse_anchor" or action_name == "notify_anchor":
            action_parameters = action.parameters
            if action_name == "ask_to_reverse_anchor":
                action_parameters.append(action.receiving_relay)
            try:
                action_function = getattr(self, action_name)
            except AttributeError:
                self.logger.warning("Action with action type {} not found!".format(action_name))
            else:
                if callable(action_function):
                    action_function(*action_parameters)
                else:
                    self.logger.warning("Action with action type {} is not callable or a function!".format(action_name))

        else:
            action_parameters = action.parameters
            receiving_relay = action.receiving_relay
            if receiving_relay is None:
                self.logger.warning("Receiving relay was empty for action {}".format(action))
            else:
                for parameter in action_parameters:
                    if isinstance(parameter, RelayId):
                        action = Action('ask_to_reverse', [receiving_relay])
                        self.relay_layer.send(parameter, action)
                        self.relay_layer.delete(parameter)

    def ask_to_reverse(self, out: RelayId):
        """Implements the ask_to_reverse protocol method.

        For further information see the phd thesis.

        Args:
            out (RelayId): Defines the RelayId that should be reversed.
        """
        self.logger.debug("Calling ask_to_reverse with {}".format(out))
        if not self.leaving:
            for v in self.get_relays_from_original_variables():
                if self.relay_layer.same_target(out, v):
                    self.remove_from_original_variables(v)
                    self.D.append(v)
            action = Action("reverse", [self.in_ref])
            # self.call_relay_layer_method("send", True, [out, action])
            # self.call_relay_layer_method("delete", False, [out])
            self.relay_layer.send(out, action)
            self.relay_layer.delete(out)
        else:
            if self.a_out is None:
                action = Action('ask_to_reverse', [self.in_ref])
                # self.call_relay_layer_method("send", True, [out, action])
                # self.call_relay_layer_method("delete", False, [out])
                self.relay_layer.send(out, action)
                self.relay_layer.delete(out)
            else:
                if self.relay_layer.same_target(out, self.a_out):
                    # new_relay = self.call_relay_layer_method("merge", True, [[out, self.a_out]])
                    new_relay = self.relay_layer.merge([self.a_out, out])
                    if new_relay:
                        self.a_out = None
                        action = Action('ask_to_reverse', [self.in_ref])
                        # self.call_relay_layer_method("send", True, [new_relay, action])
                        self.relay_layer.send(new_relay, action)
                    # else:
                    #     self.logger.warning("While merge {} \nand {}\n something went wrong".format(out, self.a_out))
                else:
                    action = Action('reverse', [self.a_out])
                    # self.call_relay_layer_method("send", True, [out, action])
                    # self.call_relay_layer_method("delete", False, [out])
                    self.relay_layer.send(out, action)
                    self.relay_layer.delete(out)

    def ask_to_reverse_anchor(self, out: RelayId, receiving_relay: RelayId):
        """Implements the ask_to_reverse_anchor protocol method.

        For further information see the phd thesis.

        Args:
            out (RelayId): Defines a Relay.
            receiving_relay (RelayId): Defines the relay that received this action.
        """
        self.logger.debug("Calling ask_to_reverse_anchor with {}\n {}".format(out, receiving_relay))
        if not self.leaving:
            action = Action('reverse', [self.in_ref])
            # self.call_relay_layer_method("send", True, [out, action])
            # self.call_relay_layer_method("delete", False, [out])
            self.relay_layer.send(out, action)
            self.relay_layer.delete(out)
        else:
            if receiving_relay is None:
                self.logger.warning("Receiving relay was empty for action ask to reverse anchor")
            else:
                action = Action('ask_to_reverse', [receiving_relay])
                # self.call_relay_layer_method("send", True, [out, action])
                self.relay_layer.send(out, action)
                self.relay_layer.delete(out)

    def notify_anchor(self):
        """Implements the notify_anchor protocol method.

        For further information see the phd thesis.
        """
        self.logger.debug("Calling notify anchor")
        if self.leaving:
            if self.a_in is not None:
                self.D.append(self.a_in)
            # self.a_in = self.call_relay_layer_method("new_relay", True, [True])
            self.a_in = self.relay_layer.new_relay()

    def reverse(self, out: RelayId):
        """Implements the reverse protocol method.

        For further information see the phd thesis.

        Args:
            out (RelayId): Defines a Relay.
        """
        self.logger.debug("Calling reverse with {}".format(out))
        if not self.leaving:
            self.reversal_of_relay(out)
            # self.call_relay_layer_method("delete", False, [out])
            self.relay_layer.delete(out)
        else:
            if self.a_out is None:
                if self.relay_layer.is_dead(out):
                    print("Reverse relay is not alive {}".format(out))
                if self.relay_layer.is_direct(out):
                    self.a_out = out
                    # print(f"OUT: {self.a_out}")
                else:
                    # Ask out to send a direct relay
                    action = Action('ask_to_reverse', [self.in_ref])
                    # self.call_relay_layer_method("send", True, [out, action])
                    # self.call_relay_layer_method("delete", False, [out])
                    self.relay_layer.send(out, action)
                    self.relay_layer.delete(out)
            else:
                if self.relay_layer.same_target(out, self.a_out):
                    start_time = time.time()
                    # merged = self.call_relay_layer_method("merge", True, [[a_out, out]])
                    merged = self.relay_layer.merge([self.a_out, out])
                    if merged:
                        self.a_out = merged
                else:
                    # print("not same target")
                    action = Action('ask_to_reverse', [self.a_out])
                    # self.call_relay_layer_method("send", True, [out, action])
                    # self.call_relay_layer_method("delete", False, [out])
                    self.relay_layer.send(out, action)
                    self.relay_layer.delete(out)

    def call_method(self, relay_id: RelayId, method: str, parameters: list):
        """Calls a method of another node connected with the given relay.

        This method calls the given method with the given parameters in the node that handles the sink relay that is
        connected with the relay defined by the given relay id.
        It calls the send method from the RelayLayer to send the method to the node.

        Args:
            relay_id (RelayId): Defines the Relay which is used to send the action.
            method (str): Defines the name of the method that should be executed in the other node.
            parameters (list): Defines a list of parameters used to execute the method.
        """
        action = Action(method, parameters)

        # self.call_relay_layer_method("send", True, [relay, action])
        result = self.relay_layer.send(relay_id, action)
        if not result:
            self.logger.error("SEND CALL FAILED FOR RELAY {}".format(relay_id))

    def start(self):
        """Starts the node by starting all threads.

        The node gets started by starting the message thread and the timeout thread.
        """
        self.logger.debug("Start node in start")
        self.message_thread.start()
        self.timeout_thread.start()

    def send_state_to_monitor(self, state):
        """Sends the given state to the StateMonitor.

        The method sends the given state to the address defined in the ModuleConfig.

        Args:
            state (Object): Defines the state that should be sent. This should be a object of a state class.
        """
        socket = self._context.socket(zmq.REQ)

        end_point = "tcp://" + ModuleConfig.STATE_MONITOR_ADDRESS

        socket.connect(end_point)
        socket.send_pyobj(state)

        reply = socket.recv_pyobj()
        socket.setsockopt(zmq.LINGER, 0)
        socket.close()
        return reply

    def shutdown(self):
        """Completely shutdown the node.

        The method stops all threads by setting the running state to False.
        After that it stops the RelayLayer and waits until the RelayLayer deleted all Relays.
        """
        # self.logger.warning("Stopping")
        self.running = False
        # print(time.time() - self.start_time)
        if self.analyse_mode:
            self.send_analyse_state()

        self.relay_layer.stop()
        self.relay_layer.timeout_thread.join()
        self.timeout_thread.join()
        self.buffer.close()
        self.buffer.join_thread()
        # self.relay_layer.shutdown()

    def stop(self):
        """Stopping the node so that the node should leave the system.

        The method sets the leaving state to True and activates the leaving protocol of the node.
        """
        self.logger.debug("Leaving the system")
        self.leaving = True

Subclasses

Methods

def ask_to_reverse(self, out: RelayId)

Implements the ask_to_reverse protocol method.

For further information see the phd thesis.

Args

out : RelayId
Defines the RelayId that should be reversed.
Expand source code
def ask_to_reverse(self, out: RelayId):
    """Implements the ask_to_reverse protocol method.

    For further information see the phd thesis.

    Args:
        out (RelayId): Defines the RelayId that should be reversed.
    """
    self.logger.debug("Calling ask_to_reverse with {}".format(out))
    if not self.leaving:
        for v in self.get_relays_from_original_variables():
            if self.relay_layer.same_target(out, v):
                self.remove_from_original_variables(v)
                self.D.append(v)
        action = Action("reverse", [self.in_ref])
        # self.call_relay_layer_method("send", True, [out, action])
        # self.call_relay_layer_method("delete", False, [out])
        self.relay_layer.send(out, action)
        self.relay_layer.delete(out)
    else:
        if self.a_out is None:
            action = Action('ask_to_reverse', [self.in_ref])
            # self.call_relay_layer_method("send", True, [out, action])
            # self.call_relay_layer_method("delete", False, [out])
            self.relay_layer.send(out, action)
            self.relay_layer.delete(out)
        else:
            if self.relay_layer.same_target(out, self.a_out):
                # new_relay = self.call_relay_layer_method("merge", True, [[out, self.a_out]])
                new_relay = self.relay_layer.merge([self.a_out, out])
                if new_relay:
                    self.a_out = None
                    action = Action('ask_to_reverse', [self.in_ref])
                    # self.call_relay_layer_method("send", True, [new_relay, action])
                    self.relay_layer.send(new_relay, action)
                # else:
                #     self.logger.warning("While merge {} \nand {}\n something went wrong".format(out, self.a_out))
            else:
                action = Action('reverse', [self.a_out])
                # self.call_relay_layer_method("send", True, [out, action])
                # self.call_relay_layer_method("delete", False, [out])
                self.relay_layer.send(out, action)
                self.relay_layer.delete(out)
def ask_to_reverse_anchor(self, out: RelayId, receiving_relay: RelayId)

Implements the ask_to_reverse_anchor protocol method.

For further information see the phd thesis.

Args

out : RelayId
Defines a Relay.
receiving_relay : RelayId
Defines the relay that received this action.
Expand source code
def ask_to_reverse_anchor(self, out: RelayId, receiving_relay: RelayId):
    """Implements the ask_to_reverse_anchor protocol method.

    For further information see the phd thesis.

    Args:
        out (RelayId): Defines a Relay.
        receiving_relay (RelayId): Defines the relay that received this action.
    """
    self.logger.debug("Calling ask_to_reverse_anchor with {}\n {}".format(out, receiving_relay))
    if not self.leaving:
        action = Action('reverse', [self.in_ref])
        # self.call_relay_layer_method("send", True, [out, action])
        # self.call_relay_layer_method("delete", False, [out])
        self.relay_layer.send(out, action)
        self.relay_layer.delete(out)
    else:
        if receiving_relay is None:
            self.logger.warning("Receiving relay was empty for action ask to reverse anchor")
        else:
            action = Action('ask_to_reverse', [receiving_relay])
            # self.call_relay_layer_method("send", True, [out, action])
            self.relay_layer.send(out, action)
            self.relay_layer.delete(out)
def call_method(self, relay_id: RelayId, method: str, parameters: list)

Calls a method of another node connected with the given relay.

This method calls the given method with the given parameters in the node that handles the sink relay that is connected with the relay defined by the given relay id. It calls the send method from the RelayLayer to send the method to the node.

Args

relay_id : RelayId
Defines the Relay which is used to send the action.
method : str
Defines the name of the method that should be executed in the other node.
parameters : list
Defines a list of parameters used to execute the method.
Expand source code
def call_method(self, relay_id: RelayId, method: str, parameters: list):
    """Calls a method of another node connected with the given relay.

    This method calls the given method with the given parameters in the node that handles the sink relay that is
    connected with the relay defined by the given relay id.
    It calls the send method from the RelayLayer to send the method to the node.

    Args:
        relay_id (RelayId): Defines the Relay which is used to send the action.
        method (str): Defines the name of the method that should be executed in the other node.
        parameters (list): Defines a list of parameters used to execute the method.
    """
    action = Action(method, parameters)

    # self.call_relay_layer_method("send", True, [relay, action])
    result = self.relay_layer.send(relay_id, action)
    if not result:
        self.logger.error("SEND CALL FAILED FOR RELAY {}".format(relay_id))
def check_in_original_variables(self, relay_id: RelayId)

Should be overridden to check if a specific relay is in one of the underlying protocol variables.

By default it checks if the relay is in the attribute N. This should be changed to a better implementation when implementing a underlying protocol.

Example

See SortedListNode implementation.

Args

relay_id : RelayId
Defines the RelayId that should be checked.
Expand source code
def check_in_original_variables(self, relay_id: RelayId):
    """Should be overridden to check if a specific relay is in one of the underlying protocol variables.

    By default it checks if the relay is in the attribute N. This should be changed to a better implementation when
    implementing a underlying protocol.

    Example:
        See `SortedListNode` implementation.

    Args:
        relay_id (RelayId): Defines the RelayId that should be checked.
    """
    return relay_id in self.N
def get_relays_from_original_variables(self)

Should be overridden to get all relays from underlying protocol variables.

This normally just returns the N attribute list. This should be changed to a better implementation when implementing a underlying protocol. It should return a list of RelayIds that are stored in any variable define in the protocol.

Example

See SortedListNode implementation.

Returns

list
A list of RelayIds containing in variables of the protocol.
Expand source code
def get_relays_from_original_variables(self):
    """Should be overridden to get all relays from underlying protocol variables.

    This normally just returns the N attribute list. This should be changed to a better implementation when
    implementing a underlying protocol. It should return a list of RelayIds that are stored in any variable define
    in the protocol.

    Example:
        See `SortedListNode` implementation.

    Returns:
        list: A list of RelayIds containing in variables of the protocol.
    """
    return self.N
def notify_anchor(self)

Implements the notify_anchor protocol method.

For further information see the phd thesis.

Expand source code
def notify_anchor(self):
    """Implements the notify_anchor protocol method.

    For further information see the phd thesis.
    """
    self.logger.debug("Calling notify anchor")
    if self.leaving:
        if self.a_in is not None:
            self.D.append(self.a_in)
        # self.a_in = self.call_relay_layer_method("new_relay", True, [True])
        self.a_in = self.relay_layer.new_relay()
def original_timeout(self)

Can be overridden to implement a underlying protocol timeout.

This method is automatically executed on the end of every timeout of this node class. It should be implemented with the timeout of the underlying protocol.

Example

See SortedListNode

Expand source code
def original_timeout(self):
    """Can be overridden to implement a underlying protocol timeout.

    This method is automatically executed on the end of every timeout of this node class.
    It should be implemented with the timeout of the underlying protocol.

    Example:
        See `SortedListNode`
    """
    pass
def remove_from_original_variables(self, relay_id: RelayId)

Should be overridden to remove relays from underlying protocol variables.

By default it removes the given relay id from the N attribute. This should be changed to a better implementation when implementing a underlying protocol.

Example

See SortedListNode implementation.

Args

relay_id : RelayId
Defines the RelayId that should be removed from the variables.
Expand source code
def remove_from_original_variables(self, relay_id: RelayId):
    """Should be overridden to remove relays from underlying protocol variables.

    By default it removes the given relay id from the N attribute. This should be changed to a better implementation
    when implementing a underlying protocol.

    Example:
        See `SortedListNode` implementation.

    Args:
        relay_id (RelayId): Defines the RelayId that should be removed from the variables.
    """
    self.N.remove(relay_id)
def replace_action(self, action)

This method is called whenever the node received a Action.

The method executes the method defined in the received action if the node is staying or reverses the connection if the node is leaving.

Args

action : Action
The received action that should be executed.
Expand source code
def replace_action(self, action):
    """This method is called whenever the node received a Action.

    The method executes the method defined in the received action if the node is staying or reverses the connection
    if the node is leaving.

    Args:
        action (Action): The received action that should be executed.
    """
    # self.logger.debug(f"Calling Replace_action with {action}")
    action_name = action.action_type.lower()
    if not self.leaving or action_name == "timeout" or action_name == "reverse" or action_name == "ask_to_reverse" \
            or action_name == "ask_to_reverse_anchor" or action_name == "notify_anchor":
        action_parameters = action.parameters
        if action_name == "ask_to_reverse_anchor":
            action_parameters.append(action.receiving_relay)
        try:
            action_function = getattr(self, action_name)
        except AttributeError:
            self.logger.warning("Action with action type {} not found!".format(action_name))
        else:
            if callable(action_function):
                action_function(*action_parameters)
            else:
                self.logger.warning("Action with action type {} is not callable or a function!".format(action_name))

    else:
        action_parameters = action.parameters
        receiving_relay = action.receiving_relay
        if receiving_relay is None:
            self.logger.warning("Receiving relay was empty for action {}".format(action))
        else:
            for parameter in action_parameters:
                if isinstance(parameter, RelayId):
                    action = Action('ask_to_reverse', [receiving_relay])
                    self.relay_layer.send(parameter, action)
                    self.relay_layer.delete(parameter)
def reversal_of_relay(self, relay_id: RelayId)

Should be overridden to reverse a specific relay reference.

The method should just send a node action from the underlying protocol over the given relay with the in_ref as parameter.

Example

See SortedListNode. Example for the linearize Action

self.call_method(relay_id, 'linearize', [self.in_ref])
Expand source code
def reversal_of_relay(self, relay_id: RelayId):
    """Should be overridden to reverse a specific relay reference.

    The method should just send a node action from the underlying protocol over the given relay with the in_ref as
    parameter.

    Example:
        See `SortedListNode`.
        Example for the linearize Action

            self.call_method(relay_id, 'linearize', [self.in_ref])

    """
    pass
def reverse(self, out: RelayId)

Implements the reverse protocol method.

For further information see the phd thesis.

Args

out : RelayId
Defines a Relay.
Expand source code
def reverse(self, out: RelayId):
    """Implements the reverse protocol method.

    For further information see the phd thesis.

    Args:
        out (RelayId): Defines a Relay.
    """
    self.logger.debug("Calling reverse with {}".format(out))
    if not self.leaving:
        self.reversal_of_relay(out)
        # self.call_relay_layer_method("delete", False, [out])
        self.relay_layer.delete(out)
    else:
        if self.a_out is None:
            if self.relay_layer.is_dead(out):
                print("Reverse relay is not alive {}".format(out))
            if self.relay_layer.is_direct(out):
                self.a_out = out
                # print(f"OUT: {self.a_out}")
            else:
                # Ask out to send a direct relay
                action = Action('ask_to_reverse', [self.in_ref])
                # self.call_relay_layer_method("send", True, [out, action])
                # self.call_relay_layer_method("delete", False, [out])
                self.relay_layer.send(out, action)
                self.relay_layer.delete(out)
        else:
            if self.relay_layer.same_target(out, self.a_out):
                start_time = time.time()
                # merged = self.call_relay_layer_method("merge", True, [[a_out, out]])
                merged = self.relay_layer.merge([self.a_out, out])
                if merged:
                    self.a_out = merged
            else:
                # print("not same target")
                action = Action('ask_to_reverse', [self.a_out])
                # self.call_relay_layer_method("send", True, [out, action])
                # self.call_relay_layer_method("delete", False, [out])
                self.relay_layer.send(out, action)
                self.relay_layer.delete(out)
def send_analyse_state(self)

Should be overridden to send a state to a StateMonitor.

In this method a State object should be created. After that it should be sent with the send_state_to_monitor method.

Example

See SortedListNode implementation.

Expand source code
def send_analyse_state(self):
    """Should be overridden to send a state to a StateMonitor.

    In this method a State object should be created. After that it should be sent with the `send_state_to_monitor`
    method.

    Example:
        See `SortedListNode` implementation.
    """
    pass
def send_state_to_monitor(self, state)

Sends the given state to the StateMonitor.

The method sends the given state to the address defined in the ModuleConfig.

Args

state : Object
Defines the state that should be sent. This should be a object of a state class.
Expand source code
def send_state_to_monitor(self, state):
    """Sends the given state to the StateMonitor.

    The method sends the given state to the address defined in the ModuleConfig.

    Args:
        state (Object): Defines the state that should be sent. This should be a object of a state class.
    """
    socket = self._context.socket(zmq.REQ)

    end_point = "tcp://" + ModuleConfig.STATE_MONITOR_ADDRESS

    socket.connect(end_point)
    socket.send_pyobj(state)

    reply = socket.recv_pyobj()
    socket.setsockopt(zmq.LINGER, 0)
    socket.close()
    return reply
def shutdown(self)

Completely shutdown the node.

The method stops all threads by setting the running state to False. After that it stops the RelayLayer and waits until the RelayLayer deleted all Relays.

Expand source code
def shutdown(self):
    """Completely shutdown the node.

    The method stops all threads by setting the running state to False.
    After that it stops the RelayLayer and waits until the RelayLayer deleted all Relays.
    """
    # self.logger.warning("Stopping")
    self.running = False
    # print(time.time() - self.start_time)
    if self.analyse_mode:
        self.send_analyse_state()

    self.relay_layer.stop()
    self.relay_layer.timeout_thread.join()
    self.timeout_thread.join()
    self.buffer.close()
    self.buffer.join_thread()
def start(self)

Starts the node by starting all threads.

The node gets started by starting the message thread and the timeout thread.

Expand source code
def start(self):
    """Starts the node by starting all threads.

    The node gets started by starting the message thread and the timeout thread.
    """
    self.logger.debug("Start node in start")
    self.message_thread.start()
    self.timeout_thread.start()
def stop(self)

Stopping the node so that the node should leave the system.

The method sets the leaving state to True and activates the leaving protocol of the node.

Expand source code
def stop(self):
    """Stopping the node so that the node should leave the system.

    The method sets the leaving state to True and activates the leaving protocol of the node.
    """
    self.logger.debug("Leaving the system")
    self.leaving = True
def timeout(self)

The Timeout method is called periodically and corrects all values in the node.

The method is correcting every value in the node or if the node is leaving it will prepare the node for leaving. For further information about the protocol see the thesis of this paper.

Expand source code
def timeout(self):
    """The Timeout method is called periodically and corrects all values in the node.

    The method is correcting every value in the node or if the node is leaving it will prepare the node for leaving.
    For further information about the protocol see the thesis of this paper.

    """
    self.logger.debug("in_ref " + str(self.in_ref))
    self.logger.debug('a_in ' + str(self.a_in))
    self.logger.debug('a_out ' + str(self.a_out))

    for relay in self.get_relays_from_original_variables():
        if not self.relay_layer.check_relay_exists(relay):
            self.remove_from_original_variables(relay)

    if self.in_ref is not None and not self.relay_layer.check_relay_exists(self.in_ref):
        self.in_ref = None
    if self.a_out is not None and not self.relay_layer.check_relay_exists(self.a_out):
        self.a_out = None
    if self.a_in is not None and not self.relay_layer.check_relay_exists(self.a_in):
        self.a_in = None

    validated_relays = self.relay_layer.get_validated_relays()

    for relay in validated_relays:
        if relay not in self.D and not self.check_in_original_variables(relay) and relay != self.in_ref \
                and relay != self.a_in and relay != self.a_out:
            self.D.append(relay)
    if self.in_ref is None or not self.relay_layer.is_sink(self.in_ref):
        if self.in_ref is not None:
            self.D.append(self.in_ref)
        self.in_ref = self.relay_layer.new_relay()

    if not self.leaving:

        for relay in self.get_relays_from_original_variables():
            # if not self.call_relay_layer_method("check_direct", True, (relay,)):
            if not self.relay_layer.is_direct(relay):
                self.remove_from_original_variables(relay)
                self.D.append(relay)

        if self.a_in is not None:
            self.D.append(self.a_in)
            self.a_in = None

        if self.a_out is not None:
            self.D.append(self.a_out)
            self.a_out = None

        for relay in self.D.copy():
            if not self.relay_layer.has_incoming(relay):
                self.D.remove(relay)
                self.reversal_of_relay(relay)
                self.relay_layer.delete(relay)
        time_start = time.time()
        self.original_timeout()
        self.logger.debug("Orig timeout time: {}".format(time.time() - time_start))

    else:
        for relay in self.get_relays_from_original_variables():
            self.remove_from_original_variables(relay)
            self.D.append(relay)

        if self.a_in is not None and not self.relay_layer.is_sink(self.a_in):
            self.D.append(self.a_in)
            self.a_in = self.relay_layer.new_relay()

        if self.a_in is None:
            self.a_in = self.relay_layer.new_relay()

        if self.a_out is not None:
            if self.relay_layer.has_incoming(self.a_out) or not self.relay_layer.is_direct(self.a_out)\
                    or self.relay_layer.is_sink(self.a_out):
                self.D.append(self.a_out)
                self.a_out = None

        for relay in self.D.copy():
            if not self.relay_layer.has_incoming(relay) and relay != self.a_out:
                action = Action('ask_to_reverse', [self.in_ref])
                self.relay_layer.send(relay, action)
                self.D.remove(relay)
                self.logger.debug("Delete relay: {}".format(relay))
                self.relay_layer.delete(relay)

        if len(self.D) == 0 and not self.relay_layer.has_incoming(self.in_ref) \
                and not self.relay_layer.has_incoming(self.a_in):
            self.shutdown()
            return
        self.logger.debug("LEAVING CHECK: {} {} {}".format(self.node_id, len(self.D),
                                                           self.relay_layer.has_incoming(self.in_ref),
                                                           self.relay_layer.has_incoming(self.a_in)))

        if self.a_out is not None:
            if not self.relay_layer.has_incoming(self.a_in):
                action = Action("ask_to_reverse_anchor", [self.a_in])
                self.relay_layer.send(self.a_out, action)

            action = Action("notify_anchor", [])
            self.relay_layer.send(self.a_out, action)
    if self.analyse_mode:
        # Send local state to monitor
        done = self.send_analyse_state()
        if done:
            self.shutdown()