Module RelayModel.LinkLayer

Expand source code
import collections
import multiprocessing
import queue
import threading
import time
import zmq
from RelayModel import RelayLogging, ModuleConfig
from RelayModel.Communication import SuccessMessage, FailureMessage, LayerMessage, TransmitMessage, Action, \
    ProbeAction, ProbeFailAction, NotAuthorizedAction, PingAction, OutRelayClosedAction, InRelayClosedAction, Message
from RelayModel.Relay import Relay
from RelayModel.RelayId import RelayLayerId


def send_message(context, layer_id, message):
    """Used to send a message to a specific link layer.

    This message can be used to send a message to another link layer using the lazy pirate pattern.
    It creates a socket in the given context and tries to send the given message to the given layer id.

    Args:
        context (zmq.Context): Defines the context where the socket should created in.
        layer_id (RelayLayerId): Defines the RID of the LinkLayer that needs to receive the message.
        message (Object): The message that should be send. Normally this should be a TransmitMessage object
            or a LayerMessage. Otherwise the other LinkLayer will deny the message. But in general every object can be
            sent over the socket.

    Returns:
        bool: True if the message got accepted or is received by the link layer, False if the Message was denied or
            could not be sent.
    """
    # Send normally over sockets
    socket = context.socket(zmq.REQ)

    end_point = "tcp://" + str(layer_id.ip) + ":" + str(layer_id.port)
    socket.connect(end_point)
    socket.send_pyobj(message)

    retries = ModuleConfig.POLL_TRIES

    while True:
        if (socket.poll(ModuleConfig.POLL_TIMEOUT) & zmq.POLLIN) != 0:
            reply = socket.recv_pyobj()

            socket.setsockopt(zmq.LINGER, 0)
            socket.close()
            if isinstance(reply, FailureMessage):
                return False
            else:
                return True

        retries -= 1

        socket.setsockopt(zmq.LINGER, 0)
        socket.close()

        if retries == 0:
            return False
        socket = context.socket(zmq.REQ)
        socket.connect(end_point)
        socket.send_pyobj(message)


def start_link_layer(link_layer_buffer, relay_layer_buffer, node_buffer, listen_ip, listen_port, pipe):
    """Creates a link layer and waits until it got stopped.

    This function should be called with a new process.

    Example:
        A basic example is given here. It creates a Process and starts it afterwards

            process = multiprocessing.Process(target=start_link_layer, args=(link_layer_buffer, message_buffer, node_queue, layer_id.ip, layer_id.port, link_layer_pipe))
            process.start()

    Args:
        All arguments are the same as the LinkLayer constructor arguments.
        For further information see `LinkLayer` class docs.

    """
    link_layer = LinkLayer(link_layer_buffer, relay_layer_buffer, node_buffer, listen_ip, listen_port, pipe)

    while not link_layer.stopped:
        time.sleep(0.1)
    # link_layer._logger.debug("End funktion start link layer")


def start_layer_buffer_watch(layer_id, buffer, relay_layer_buffer, running):
    """Starts a watch over a RelayLayer buffer.

    It watches a given buffer and if there occurs a message it tries to send it to the right endpoint.
    This method should be called in a seperated process.
    The function runs as long the running state is equal to 1 and tries to get a message from the buffer.

    Args:
        layer_id (RelayLayerId): Defines the layer id of the relay layer that registered in this function. This is only
            needed for creating a logger file.
        buffer (multiprocessing.Queue): Defines the buffer that should be watched as a multiprocessing queue.
        relay_layer_buffer (multiprocessing.Queue): Defines the message buffer of a RelayLayer. This buffer is needed
            to send messages to the same RelayLayer as registered.
        running (multiprocessing.Value): Defines the running state of the function. If this value is set to 0 the
            function stops. This is used to stop the function within a process from a parent process.
    """
    logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL,
                                     "link_layer_watch_layer_buffer_{}_{}".format(layer_id.ip, layer_id.port))
    logger.debug("Start relay layer buffer watch for {}".format(layer_id))
    context = zmq.Context()
    while running.value == 1:
        try:
            message = buffer.get(True, 0.01)
        except queue.Empty:
            time.sleep(0.1)
        else:
            logger.debug("Layer buffer size: {}".format(buffer.qsize()))
            if isinstance(message, LayerMessage):
                if message.layer_id == layer_id:
                    # logger.debug("Message should be sent to own layer. Send to node!\n")

                    action = message.action

                    # check if action is an internal action
                    if isinstance(action, ProbeAction) or isinstance(action, ProbeFailAction) \
                            or isinstance(action, NotAuthorizedAction) or isinstance(action, PingAction) \
                            or isinstance(action, InRelayClosedAction) or isinstance(action, OutRelayClosedAction):
                        try:
                            relay_layer_buffer.put_nowait(message)
                        except BrokenPipeError:
                            print("BrokenPipeError in RelayLayerWatch first")

                    else:
                        logger.warning("Message with action {} is not an internal action and will not be "
                                       "processed".format(action))
                else:
                    result = send_message(context, message.layer_id, message)
                    if not result:
                        if ModuleConfig.CONSIDER_AS_CLOSED:
                            if isinstance(message.action, PingAction):
                                for ping in message.action.pings:
                                    in_relay_closed_action = InRelayClosedAction([ping.key], message.layer_id,
                                                                                 ping.relay_id)
                                    layer_message = LayerMessage(message.layer_id, in_relay_closed_action)
                                    try:
                                        relay_layer_buffer.put_nowait(layer_message)
                                    except BrokenPipeError:
                                        print("BrokenPipeError in RelayLAyerwatch second")
                        logger.warning("Message could not be transmitted to layer {}".format(message))
            else:
                logger.warning("Message is not in right format\n")
    # logger.debug("End watching layer")


def start_listening(listen_ip, listen_port, relay_layer_buffer, running):
    """Starts listening on a specific ip and port and can be used to receive messages on this address.

    The function creates a listening socket on the given address and tries, as long the running state is 1, to receive
    messages from it. When a message is received it will get checked if it is a LayerMessage or a TransmitMessage.
    If the format is correct it sends a `SuccessMessage` back to the sending LinkLayer. Otherwise it sends a
    `FailureMessage` back. If the message is a valid Message the message will be provided to the RelayLayer of this
    LinkLayer. This is done by inserting the message in the given RelayLayer message buffer.

    Args:
        listen_ip (str): The ip the socket should be listen on. This is only used for the logger file name. The socket
            normally listens on every interface on a specific port.
        listen_port (int): Defines the port the socket should listen on.
        relay_layer_buffer (multiprocessing.Queue): Provides the message buffer of a RelayLayer. It is used to send
            received messages to the RelayLayer so it can process the message.
        running (multiprocessing.Value): Defines the running state of the function. If this value is set to 0 the
            function stops. This is used to stop the function within a process from a parent process.
    """
    context = zmq.Context()
    listening_socket = context.socket(zmq.REP)
    endpoint_string = "tcp://*:" + str(listen_port)
    listening_socket.bind(endpoint_string)

    logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL,
                                     'link_layer_listening_socket_{}_{}'.format(listen_ip, listen_port))

    logger.debug(f"Start listening thread for ip: {listen_ip} and port: {listen_port}")
    while running.value == 1:
        if (listening_socket.poll(ModuleConfig.POLL_TIMEOUT) & zmq.POLLIN) != 0:
            message = listening_socket.recv_pyobj()

            logger.debug(f"{listen_ip}:{listen_port}: Message received: {message}")

            if isinstance(message, LayerMessage) or isinstance(message, TransmitMessage):
                return_message = SuccessMessage("Success")
                listening_socket.send_pyobj(return_message)
                try:
                    relay_layer_buffer.put_nowait(message)
                except BrokenPipeError:
                    print("Broken Pipe Error in listening thread")

            else:
                return_message = FailureMessage("Failure")
                listening_socket.send_pyobj(return_message)

    logger.debug(f"Stopped listening thread for ip: {listen_ip} and port: {listen_port}")


class LinkLayer:
    """The LinkLayer class provides the functionality to send and receive messages to other nodes.

    The LinkLayer watches message buffers from the RelayLayer and Relays and sends every message to the right endpoint.
    For further information about the functionality see the thesis of this library.

    Attributes:
        running (bool): Defines the running state of the LinkLayer. If this is False all Thread should be stopped.
        stopped (bool): Defines the state if the LinkLayer is stopped. It is only set to True if the LinkLayer
            completely stopped all threads and processes.
    """
    def __init__(self, link_layer_buffer, relay_layer_buffer, node_buffer, listen_ip, listen_port, pipe):
        """Creates a LinkLayer object.

        Which threads and processes the LinkLayer starts is described in the thesis of this library.

        Args:
            link_layer_buffer (multiprocessing.Queue): Defines the buffer where messages can be inserted. This messages
                will be inserted from the LinkLayer in the Relay or RelayLayer buffers. Only tuples should be inserted.
                The first entry of the tuple should either be a Relay object or a RelayLayerId. This depends on the
                buffer the message in the second entry of the tuple should be inserted into.
            relay_layer_buffer (multiprocessing.Queue): Defines the RelayLayer message queue. This buffer is used to
                send messages to the RelayLayer that should be processed by it.
            node_buffer (multiprocessing.Queue): Defines the Node message queue. This buffer is use to send messages to
                the underlying node. This messages are mostly `Action` messages.
            listen_ip (str): Defines the ip the LinkLayer should be reachable on.
            listen_port (int): Defines the port the LinkLayer should be reachable on.
            pipe (multiprocessing.Pipe): Defines a Pipe that is used to call methods in this LinkLayer from another
                Process. There should only occur Triples of data where the first entry defines the method name. The
                second entry defines if there is a response needed as a bool and the third entry is a list of arguments
                needed to execute the method. E.g. ('is_buffer_empty', True, [relay_id])
        """
        self._buffer = link_layer_buffer
        self._node_buffer = node_buffer
        self._relay_layer_buffer = relay_layer_buffer
        self._listen_ip = listen_ip
        self._listen_port = listen_port
        self._relay_running_states = {}
        self.running = True
        self.stopped = False
        self._processes_running = multiprocessing.Value("i", 1)

        self._listening_process = None

        self._layer_buffer = multiprocessing.Queue()
        self._layer_process = None

        self._relay_buffer = {}

        self._context = zmq.Context()

        self._method_call_pipe = pipe

        self._logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL,
                                               "link_layer_{}_{}".format(listen_ip, listen_port))

        self._worker_thread = threading.Thread(target=self._start_worker_thread, daemon=True)
        self._worker_thread.start()

        self._method_calling_thread = threading.Thread(target=self._start_method_calling_thread, daemon=True)
        self._method_calling_thread.start()

        self._relay_buffer_watch_thread = threading.Thread(target=self._watch_relay_buffers, daemon=True)
        self._relay_buffer_watch_thread.start()

        self._init_socket()

    def _start_worker_thread(self):

        while self.running:
            try:
                information = self._buffer.get(True, 0.01)
                self._logger.debug("Link layer buffer size: {}".format(self._buffer.qsize()))
            except queue.Empty:
                time.sleep(0.01)
            else:
                key = information[0]
                message = information[1]
                # self._logger.debug("Got message {}".format(information))
                if isinstance(key, RelayLayerId):
                    self._layer_buffer.put_nowait(message)
                elif isinstance(key, Relay):
                    if key.relay_id in self._relay_buffer:
                        self._relay_buffer[key.relay_id][0] = key
                        self._relay_buffer[key.relay_id][1].append(message)
                    elif key.relay_id not in self._relay_running_states:
                        self._relay_running_states[key.relay_id] = True
                        self._relay_buffer[key.relay_id] = [key, collections.deque()]
                        self._relay_buffer[key.relay_id][1].append(message)
        # self._logger.warning("Stopped Worker Thread")

    def _watch_relay_buffers(self):
        self._logger.debug("Start relay buffer watches")
        while self.running:
            for buffer_list in self._relay_buffer.copy().values():
                relay = buffer_list[0]
                relay_id = relay.relay_id
                buffer = buffer_list[1]
                if len(buffer) > 0:
                    send_messages = []
                    self._logger.debug("Relay buffer size for relay {}: {}".format(relay_id, len(buffer)))
                    while len(buffer) > 0:
                        send_messages.append(buffer.popleft())

                    out_id = relay.out_relay.out_id
                    for message in send_messages:
                        if not self.running:
                            break
                        # logger.debug("Message received: {}".format(message))
                        if isinstance(message, TransmitMessage):
                            if out_id is not None:
                                # logger.debug(f"Message is TransmitMessage and should be transmitted to a relay layer with id "
                                #              f"{out_id}\n")

                                if out_id.layer_id.ip == relay_id.layer_id.ip and out_id.layer_id.port == relay_id.layer_id.port:
                                    # logger.debug(f"Receiving message: {message}\n")
                                    message.receiving_relay = relay_id
                                    self._relay_layer_buffer.put_nowait(message)
                                    # else:
                                    #     logger.error(
                                    #         "Not supported message type {} was received by layer".format(message.__class__))
                                else:
                                    result = send_message(self._context, out_id.layer_id, message)
                                    if not result:
                                        if ModuleConfig.CONSIDER_AS_CLOSED:
                                            out_relay_closed = OutRelayClosedAction(out_id)
                                            layer_message = LayerMessage(relay_id.layer_id, out_relay_closed)
                                            self._relay_layer_buffer.put_nowait(layer_message)
                                        self._logger.error("Message discarded: {}".format(message))
                            else:
                                message.receiving_relay = relay_id
                                self._relay_layer_buffer.put_nowait(message)
                        elif isinstance(message, Action):
                            # logger.debug("Action provided")
                            message.receiving_relay = relay_id
                            self._node_buffer.put_nowait(message)
                        else:
                            self._logger.debug("Message is not in right format")
                else:
                    if not self._relay_running_states[relay_id]:
                        self._relay_buffer.pop(relay_id)
            time.sleep(0.01)
        # print("Stopping relay watch end process")
        # self._logger.warning("Stopped watching relay buffers")

    def _start_method_calling_thread(self):
        self._logger.debug("Start method calling thread")
        while self.running:
            try:
                msg = self._method_call_pipe.recv()
                start_time = time.time()

                method = msg[0]
                needs_response = msg[1]
                try:
                    args = list(msg[2])
                except TypeError:
                    print(method)
                    raise TypeError

                # self._logger.debug("Should call {}".format(method))
                action_function = getattr(self, method)

                if callable(action_function):
                    result = action_function(*args)
                    # self._logger.debug(f"Called {method} got {result}")
                    if needs_response:
                        self._method_call_pipe.send(result)
            except EOFError:
                self._logger.error("EOF ERROR")
                break
            except BrokenPipeError:
                self._logger.error("Broken Pipe ERROR")
                break
        # self._logger.warning("Stopped Method Calling Thread")

    def _init_socket(self):
        process = multiprocessing.Process(target=start_listening, daemon=True, args=(self._listen_ip, self._listen_port,
                                                                                     self._relay_layer_buffer,
                                                                                     self._processes_running))
        self._listening_process = process

        self._listening_process.start()
        # print("LINK LAYER START LISTENING {} {}".format(self._listen_port, self._listening_process.pid))

    def is_buffer_empty(self, relay_id):
        """Checks if a buffer for a given relay id defining a relay is empty.

        The method checks if there is a buffer for the given relay id and checks if it is empty.
        This method should be called with the method calling pipe.

        Args:
            relay_id (RelayId): The RelayId that defines the Relay which buffer should be checked.

        Returns:
            bool: True if buffer is not existent or empty. False if the buffer has an entry.
        """
        if relay_id in self._relay_buffer:
            return len(self._relay_buffer[relay_id]) == 0
        else:
            return True

    def check_key_in_relay_buffer(self, relay_id, check_key):
        """Checks the buffer of a given Relay if there is a key in one of the Actions.

        First the method checks if the buffer exists for the given relay. If there is a buffer it checks if the buffer
        is empty. Both conditions return False. After that the message checks all messages in the buffer if there is a
        key set in one parameter set as a RelayParameter.
        This method should be called with the method calling pipe.

        Args:
            relay_id (RelayId): Defines the RelayId of the Relay which buffer should be checked.
            check_key (str): Defines the key that should be checked.

        Returns:
            bool: True if there is the given key in one RelayParameter in the buffer of the given Relay. False otherwise
        """
        if relay_id in self._relay_buffer:
            if len(self._relay_buffer[relay_id]) == 0:
                return False
            else:
                for message in self._relay_buffer[relay_id].copy():
                    if isinstance(message, Message) \
                            and message.action.check_has_key_as_parameter(check_key):
                        return True
                    elif isinstance(message, TransmitMessage) \
                            and message.message.action.check_has_key_as_parameter(check_key):
                        return True
                    elif isinstance(message, Action) and message.check_has_key_as_parameter(check_key):
                        return True
                return False
        else:
            return False

    def register_relay_layer(self, layer_id):
        """Registers a RelayLayer and creates a buffer watch process.

        The LinkLayer creates the buffer watch process for the relay layer. It only creates the process if it is not set
        yet.

        Args:
            layer_id (RelayLayerId): Defines the RelayLayerId of the RelayLayer which buffer should be watched.
        """
        # print("registered relay layer {}".format(layer_id))
        if self._layer_process is None:
            # thread = threading.Thread(target=self._watch_relay_layer_buffer, args=(layer_id,), daemon=True)

            # self._layer_process = thread
            # thread.start()layer_id, buffer, node_buffer, running
            process = multiprocessing.Process(target=start_layer_buffer_watch, daemon=True,
                                              args=(layer_id, self._layer_buffer,
                                                    self._relay_layer_buffer,
                                                    self._processes_running))
            self._layer_process = process
            self._layer_process.start()
            # print("WATCH RELAY LAYER BUFFER {} {}".format(layer_id, self._layer_process.pid))
            #
            # self._worker_queue.append(process)
            # print("Register Relay {}".format(process.name))
        else:
            self._logger.warning("Current Link layer already watches a relay layer")

    def shutdown(self):
        """Shutdowns the LinkLayer completely.

        It first shutdowns all Threads and then all subprocesses.
        This method should be called over the method call pipe.
        """
        # self._logger.debug("Shutdown Linklayer {}".format(self._listen_port))

        self._processes_running.value = 0
        self.running = False
        # print("Layer process: {}".format(self._layer_process.pid))
        # print("Shutdown LinkLayer")
        # self._logger.debug("Shutdown relay thread")
        self._relay_buffer_watch_thread.join()
        # print("Terminate LinkLayer layer process")
        self._layer_process.join(1)
        if self._layer_process.exitcode is None:
            self._layer_process.kill()
        # self._logger.debug("Shutdown layer process")
        # print("Terminate LinkLayer listening process")
        if self._listening_process is not None:
            # print("Listening process: {}".format(self._listening_process.pid))
            self._listening_process.join(1)
            # self._logger.debug("EXITCODE OF LISTENING PROCESS: ".format(self._listening_process.exitcode))
            if self._listening_process.exitcode is None:
                self._listening_process.kill()
        # self._logger.debug("Shutdown listening process")
        # print("Shutdown link layer")
        for running in self._relay_running_states.values():
            running = False
        self.stopped = True
        # print("Done Shutdown")
        # self._logger.debug("Done shutdown Linklayer {}".format(self._listen_port))

    def stop_relay_watch(self, relay_id):
        """Stops a watch of a relay buffer.

        The method stops the watch from a relay buffer.
        This method should be called over the method call pipe.

        Args:
            relay_id (RelayId): The RelayId of the Relay which buffer watch should be stopped.
        """
        if relay_id in self._relay_running_states:
            # print("Stopping relay watch", relay_id, self.__threads[relay_id].pid)
            self._relay_running_states[relay_id] = False
            # print(self.__running_states[relay_id].value)

            self._logger.debug("Process stopped for relay {}".format(relay_id))

Functions

def send_message(context, layer_id, message)

Used to send a message to a specific link layer.

This message can be used to send a message to another link layer using the lazy pirate pattern. It creates a socket in the given context and tries to send the given message to the given layer id.

Args

context : zmq.Context
Defines the context where the socket should created in.
layer_id : RelayLayerId
Defines the RID of the LinkLayer that needs to receive the message.
message : Object
The message that should be send. Normally this should be a TransmitMessage object or a LayerMessage. Otherwise the other LinkLayer will deny the message. But in general every object can be sent over the socket.

Returns

bool
True if the message got accepted or is received by the link layer, False if the Message was denied or could not be sent.
Expand source code
def send_message(context, layer_id, message):
    """Used to send a message to a specific link layer.

    This message can be used to send a message to another link layer using the lazy pirate pattern.
    It creates a socket in the given context and tries to send the given message to the given layer id.

    Args:
        context (zmq.Context): Defines the context where the socket should created in.
        layer_id (RelayLayerId): Defines the RID of the LinkLayer that needs to receive the message.
        message (Object): The message that should be send. Normally this should be a TransmitMessage object
            or a LayerMessage. Otherwise the other LinkLayer will deny the message. But in general every object can be
            sent over the socket.

    Returns:
        bool: True if the message got accepted or is received by the link layer, False if the Message was denied or
            could not be sent.
    """
    # Send normally over sockets
    socket = context.socket(zmq.REQ)

    end_point = "tcp://" + str(layer_id.ip) + ":" + str(layer_id.port)
    socket.connect(end_point)
    socket.send_pyobj(message)

    retries = ModuleConfig.POLL_TRIES

    while True:
        if (socket.poll(ModuleConfig.POLL_TIMEOUT) & zmq.POLLIN) != 0:
            reply = socket.recv_pyobj()

            socket.setsockopt(zmq.LINGER, 0)
            socket.close()
            if isinstance(reply, FailureMessage):
                return False
            else:
                return True

        retries -= 1

        socket.setsockopt(zmq.LINGER, 0)
        socket.close()

        if retries == 0:
            return False
        socket = context.socket(zmq.REQ)
        socket.connect(end_point)
        socket.send_pyobj(message)
def start_layer_buffer_watch(layer_id, buffer, relay_layer_buffer, running)

Starts a watch over a RelayLayer buffer.

It watches a given buffer and if there occurs a message it tries to send it to the right endpoint. This method should be called in a seperated process. The function runs as long the running state is equal to 1 and tries to get a message from the buffer.

Args

layer_id : RelayLayerId
Defines the layer id of the relay layer that registered in this function. This is only needed for creating a logger file.
buffer : multiprocessing.Queue
Defines the buffer that should be watched as a multiprocessing queue.
relay_layer_buffer : multiprocessing.Queue
Defines the message buffer of a RelayLayer. This buffer is needed to send messages to the same RelayLayer as registered.
running : multiprocessing.Value
Defines the running state of the function. If this value is set to 0 the function stops. This is used to stop the function within a process from a parent process.
Expand source code
def start_layer_buffer_watch(layer_id, buffer, relay_layer_buffer, running):
    """Starts a watch over a RelayLayer buffer.

    It watches a given buffer and if there occurs a message it tries to send it to the right endpoint.
    This method should be called in a seperated process.
    The function runs as long the running state is equal to 1 and tries to get a message from the buffer.

    Args:
        layer_id (RelayLayerId): Defines the layer id of the relay layer that registered in this function. This is only
            needed for creating a logger file.
        buffer (multiprocessing.Queue): Defines the buffer that should be watched as a multiprocessing queue.
        relay_layer_buffer (multiprocessing.Queue): Defines the message buffer of a RelayLayer. This buffer is needed
            to send messages to the same RelayLayer as registered.
        running (multiprocessing.Value): Defines the running state of the function. If this value is set to 0 the
            function stops. This is used to stop the function within a process from a parent process.
    """
    logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL,
                                     "link_layer_watch_layer_buffer_{}_{}".format(layer_id.ip, layer_id.port))
    logger.debug("Start relay layer buffer watch for {}".format(layer_id))
    context = zmq.Context()
    while running.value == 1:
        try:
            message = buffer.get(True, 0.01)
        except queue.Empty:
            time.sleep(0.1)
        else:
            logger.debug("Layer buffer size: {}".format(buffer.qsize()))
            if isinstance(message, LayerMessage):
                if message.layer_id == layer_id:
                    # logger.debug("Message should be sent to own layer. Send to node!\n")

                    action = message.action

                    # check if action is an internal action
                    if isinstance(action, ProbeAction) or isinstance(action, ProbeFailAction) \
                            or isinstance(action, NotAuthorizedAction) or isinstance(action, PingAction) \
                            or isinstance(action, InRelayClosedAction) or isinstance(action, OutRelayClosedAction):
                        try:
                            relay_layer_buffer.put_nowait(message)
                        except BrokenPipeError:
                            print("BrokenPipeError in RelayLayerWatch first")

                    else:
                        logger.warning("Message with action {} is not an internal action and will not be "
                                       "processed".format(action))
                else:
                    result = send_message(context, message.layer_id, message)
                    if not result:
                        if ModuleConfig.CONSIDER_AS_CLOSED:
                            if isinstance(message.action, PingAction):
                                for ping in message.action.pings:
                                    in_relay_closed_action = InRelayClosedAction([ping.key], message.layer_id,
                                                                                 ping.relay_id)
                                    layer_message = LayerMessage(message.layer_id, in_relay_closed_action)
                                    try:
                                        relay_layer_buffer.put_nowait(layer_message)
                                    except BrokenPipeError:
                                        print("BrokenPipeError in RelayLAyerwatch second")
                        logger.warning("Message could not be transmitted to layer {}".format(message))
            else:
                logger.warning("Message is not in right format\n")

Creates a link layer and waits until it got stopped.

This function should be called with a new process.

Example

A basic example is given here. It creates a Process and starts it afterwards

process = multiprocessing.Process(target=start_link_layer, args=(link_layer_buffer, message_buffer, node_queue, layer_id.ip, layer_id.port, link_layer_pipe))
process.start()

Args

All arguments are the same as the LinkLayer constructor arguments. For further information see LinkLayer class docs.

Expand source code
def start_link_layer(link_layer_buffer, relay_layer_buffer, node_buffer, listen_ip, listen_port, pipe):
    """Creates a link layer and waits until it got stopped.

    This function should be called with a new process.

    Example:
        A basic example is given here. It creates a Process and starts it afterwards

            process = multiprocessing.Process(target=start_link_layer, args=(link_layer_buffer, message_buffer, node_queue, layer_id.ip, layer_id.port, link_layer_pipe))
            process.start()

    Args:
        All arguments are the same as the LinkLayer constructor arguments.
        For further information see `LinkLayer` class docs.

    """
    link_layer = LinkLayer(link_layer_buffer, relay_layer_buffer, node_buffer, listen_ip, listen_port, pipe)

    while not link_layer.stopped:
        time.sleep(0.1)
def start_listening(listen_ip, listen_port, relay_layer_buffer, running)

Starts listening on a specific ip and port and can be used to receive messages on this address.

The function creates a listening socket on the given address and tries, as long the running state is 1, to receive messages from it. When a message is received it will get checked if it is a LayerMessage or a TransmitMessage. If the format is correct it sends a SuccessMessage back to the sending LinkLayer. Otherwise it sends a FailureMessage back. If the message is a valid Message the message will be provided to the RelayLayer of this LinkLayer. This is done by inserting the message in the given RelayLayer message buffer.

Args

listen_ip : str
The ip the socket should be listen on. This is only used for the logger file name. The socket normally listens on every interface on a specific port.
listen_port : int
Defines the port the socket should listen on.
relay_layer_buffer : multiprocessing.Queue
Provides the message buffer of a RelayLayer. It is used to send received messages to the RelayLayer so it can process the message.
running : multiprocessing.Value
Defines the running state of the function. If this value is set to 0 the function stops. This is used to stop the function within a process from a parent process.
Expand source code
def start_listening(listen_ip, listen_port, relay_layer_buffer, running):
    """Starts listening on a specific ip and port and can be used to receive messages on this address.

    The function creates a listening socket on the given address and tries, as long the running state is 1, to receive
    messages from it. When a message is received it will get checked if it is a LayerMessage or a TransmitMessage.
    If the format is correct it sends a `SuccessMessage` back to the sending LinkLayer. Otherwise it sends a
    `FailureMessage` back. If the message is a valid Message the message will be provided to the RelayLayer of this
    LinkLayer. This is done by inserting the message in the given RelayLayer message buffer.

    Args:
        listen_ip (str): The ip the socket should be listen on. This is only used for the logger file name. The socket
            normally listens on every interface on a specific port.
        listen_port (int): Defines the port the socket should listen on.
        relay_layer_buffer (multiprocessing.Queue): Provides the message buffer of a RelayLayer. It is used to send
            received messages to the RelayLayer so it can process the message.
        running (multiprocessing.Value): Defines the running state of the function. If this value is set to 0 the
            function stops. This is used to stop the function within a process from a parent process.
    """
    context = zmq.Context()
    listening_socket = context.socket(zmq.REP)
    endpoint_string = "tcp://*:" + str(listen_port)
    listening_socket.bind(endpoint_string)

    logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL,
                                     'link_layer_listening_socket_{}_{}'.format(listen_ip, listen_port))

    logger.debug(f"Start listening thread for ip: {listen_ip} and port: {listen_port}")
    while running.value == 1:
        if (listening_socket.poll(ModuleConfig.POLL_TIMEOUT) & zmq.POLLIN) != 0:
            message = listening_socket.recv_pyobj()

            logger.debug(f"{listen_ip}:{listen_port}: Message received: {message}")

            if isinstance(message, LayerMessage) or isinstance(message, TransmitMessage):
                return_message = SuccessMessage("Success")
                listening_socket.send_pyobj(return_message)
                try:
                    relay_layer_buffer.put_nowait(message)
                except BrokenPipeError:
                    print("Broken Pipe Error in listening thread")

            else:
                return_message = FailureMessage("Failure")
                listening_socket.send_pyobj(return_message)

    logger.debug(f"Stopped listening thread for ip: {listen_ip} and port: {listen_port}")

Classes

class LinkLayer (link_layer_buffer, relay_layer_buffer, node_buffer, listen_ip, listen_port, pipe)

The LinkLayer class provides the functionality to send and receive messages to other nodes.

The LinkLayer watches message buffers from the RelayLayer and Relays and sends every message to the right endpoint. For further information about the functionality see the thesis of this library.

Attributes

running : bool
Defines the running state of the LinkLayer. If this is False all Thread should be stopped.
stopped : bool
Defines the state if the LinkLayer is stopped. It is only set to True if the LinkLayer completely stopped all threads and processes.

Creates a LinkLayer object.

Which threads and processes the LinkLayer starts is described in the thesis of this library.

Args

link_layer_buffer : multiprocessing.Queue
Defines the buffer where messages can be inserted. This messages will be inserted from the LinkLayer in the Relay or RelayLayer buffers. Only tuples should be inserted. The first entry of the tuple should either be a Relay object or a RelayLayerId. This depends on the buffer the message in the second entry of the tuple should be inserted into.
relay_layer_buffer : multiprocessing.Queue
Defines the RelayLayer message queue. This buffer is used to send messages to the RelayLayer that should be processed by it.
node_buffer : multiprocessing.Queue
Defines the Node message queue. This buffer is use to send messages to the underlying node. This messages are mostly Action messages.
listen_ip : str
Defines the ip the LinkLayer should be reachable on.
listen_port : int
Defines the port the LinkLayer should be reachable on.
pipe : multiprocessing.Pipe
Defines a Pipe that is used to call methods in this LinkLayer from another Process. There should only occur Triples of data where the first entry defines the method name. The second entry defines if there is a response needed as a bool and the third entry is a list of arguments needed to execute the method. E.g. ('is_buffer_empty', True, [relay_id])
Expand source code
class LinkLayer:
    """The LinkLayer class provides the functionality to send and receive messages to other nodes.

    The LinkLayer watches message buffers from the RelayLayer and Relays and sends every message to the right endpoint.
    For further information about the functionality see the thesis of this library.

    Attributes:
        running (bool): Defines the running state of the LinkLayer. If this is False all Thread should be stopped.
        stopped (bool): Defines the state if the LinkLayer is stopped. It is only set to True if the LinkLayer
            completely stopped all threads and processes.
    """
    def __init__(self, link_layer_buffer, relay_layer_buffer, node_buffer, listen_ip, listen_port, pipe):
        """Creates a LinkLayer object.

        Which threads and processes the LinkLayer starts is described in the thesis of this library.

        Args:
            link_layer_buffer (multiprocessing.Queue): Defines the buffer where messages can be inserted. This messages
                will be inserted from the LinkLayer in the Relay or RelayLayer buffers. Only tuples should be inserted.
                The first entry of the tuple should either be a Relay object or a RelayLayerId. This depends on the
                buffer the message in the second entry of the tuple should be inserted into.
            relay_layer_buffer (multiprocessing.Queue): Defines the RelayLayer message queue. This buffer is used to
                send messages to the RelayLayer that should be processed by it.
            node_buffer (multiprocessing.Queue): Defines the Node message queue. This buffer is use to send messages to
                the underlying node. This messages are mostly `Action` messages.
            listen_ip (str): Defines the ip the LinkLayer should be reachable on.
            listen_port (int): Defines the port the LinkLayer should be reachable on.
            pipe (multiprocessing.Pipe): Defines a Pipe that is used to call methods in this LinkLayer from another
                Process. There should only occur Triples of data where the first entry defines the method name. The
                second entry defines if there is a response needed as a bool and the third entry is a list of arguments
                needed to execute the method. E.g. ('is_buffer_empty', True, [relay_id])
        """
        self._buffer = link_layer_buffer
        self._node_buffer = node_buffer
        self._relay_layer_buffer = relay_layer_buffer
        self._listen_ip = listen_ip
        self._listen_port = listen_port
        self._relay_running_states = {}
        self.running = True
        self.stopped = False
        self._processes_running = multiprocessing.Value("i", 1)

        self._listening_process = None

        self._layer_buffer = multiprocessing.Queue()
        self._layer_process = None

        self._relay_buffer = {}

        self._context = zmq.Context()

        self._method_call_pipe = pipe

        self._logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL,
                                               "link_layer_{}_{}".format(listen_ip, listen_port))

        self._worker_thread = threading.Thread(target=self._start_worker_thread, daemon=True)
        self._worker_thread.start()

        self._method_calling_thread = threading.Thread(target=self._start_method_calling_thread, daemon=True)
        self._method_calling_thread.start()

        self._relay_buffer_watch_thread = threading.Thread(target=self._watch_relay_buffers, daemon=True)
        self._relay_buffer_watch_thread.start()

        self._init_socket()

    def _start_worker_thread(self):

        while self.running:
            try:
                information = self._buffer.get(True, 0.01)
                self._logger.debug("Link layer buffer size: {}".format(self._buffer.qsize()))
            except queue.Empty:
                time.sleep(0.01)
            else:
                key = information[0]
                message = information[1]
                # self._logger.debug("Got message {}".format(information))
                if isinstance(key, RelayLayerId):
                    self._layer_buffer.put_nowait(message)
                elif isinstance(key, Relay):
                    if key.relay_id in self._relay_buffer:
                        self._relay_buffer[key.relay_id][0] = key
                        self._relay_buffer[key.relay_id][1].append(message)
                    elif key.relay_id not in self._relay_running_states:
                        self._relay_running_states[key.relay_id] = True
                        self._relay_buffer[key.relay_id] = [key, collections.deque()]
                        self._relay_buffer[key.relay_id][1].append(message)
        # self._logger.warning("Stopped Worker Thread")

    def _watch_relay_buffers(self):
        self._logger.debug("Start relay buffer watches")
        while self.running:
            for buffer_list in self._relay_buffer.copy().values():
                relay = buffer_list[0]
                relay_id = relay.relay_id
                buffer = buffer_list[1]
                if len(buffer) > 0:
                    send_messages = []
                    self._logger.debug("Relay buffer size for relay {}: {}".format(relay_id, len(buffer)))
                    while len(buffer) > 0:
                        send_messages.append(buffer.popleft())

                    out_id = relay.out_relay.out_id
                    for message in send_messages:
                        if not self.running:
                            break
                        # logger.debug("Message received: {}".format(message))
                        if isinstance(message, TransmitMessage):
                            if out_id is not None:
                                # logger.debug(f"Message is TransmitMessage and should be transmitted to a relay layer with id "
                                #              f"{out_id}\n")

                                if out_id.layer_id.ip == relay_id.layer_id.ip and out_id.layer_id.port == relay_id.layer_id.port:
                                    # logger.debug(f"Receiving message: {message}\n")
                                    message.receiving_relay = relay_id
                                    self._relay_layer_buffer.put_nowait(message)
                                    # else:
                                    #     logger.error(
                                    #         "Not supported message type {} was received by layer".format(message.__class__))
                                else:
                                    result = send_message(self._context, out_id.layer_id, message)
                                    if not result:
                                        if ModuleConfig.CONSIDER_AS_CLOSED:
                                            out_relay_closed = OutRelayClosedAction(out_id)
                                            layer_message = LayerMessage(relay_id.layer_id, out_relay_closed)
                                            self._relay_layer_buffer.put_nowait(layer_message)
                                        self._logger.error("Message discarded: {}".format(message))
                            else:
                                message.receiving_relay = relay_id
                                self._relay_layer_buffer.put_nowait(message)
                        elif isinstance(message, Action):
                            # logger.debug("Action provided")
                            message.receiving_relay = relay_id
                            self._node_buffer.put_nowait(message)
                        else:
                            self._logger.debug("Message is not in right format")
                else:
                    if not self._relay_running_states[relay_id]:
                        self._relay_buffer.pop(relay_id)
            time.sleep(0.01)
        # print("Stopping relay watch end process")
        # self._logger.warning("Stopped watching relay buffers")

    def _start_method_calling_thread(self):
        self._logger.debug("Start method calling thread")
        while self.running:
            try:
                msg = self._method_call_pipe.recv()
                start_time = time.time()

                method = msg[0]
                needs_response = msg[1]
                try:
                    args = list(msg[2])
                except TypeError:
                    print(method)
                    raise TypeError

                # self._logger.debug("Should call {}".format(method))
                action_function = getattr(self, method)

                if callable(action_function):
                    result = action_function(*args)
                    # self._logger.debug(f"Called {method} got {result}")
                    if needs_response:
                        self._method_call_pipe.send(result)
            except EOFError:
                self._logger.error("EOF ERROR")
                break
            except BrokenPipeError:
                self._logger.error("Broken Pipe ERROR")
                break
        # self._logger.warning("Stopped Method Calling Thread")

    def _init_socket(self):
        process = multiprocessing.Process(target=start_listening, daemon=True, args=(self._listen_ip, self._listen_port,
                                                                                     self._relay_layer_buffer,
                                                                                     self._processes_running))
        self._listening_process = process

        self._listening_process.start()
        # print("LINK LAYER START LISTENING {} {}".format(self._listen_port, self._listening_process.pid))

    def is_buffer_empty(self, relay_id):
        """Checks if a buffer for a given relay id defining a relay is empty.

        The method checks if there is a buffer for the given relay id and checks if it is empty.
        This method should be called with the method calling pipe.

        Args:
            relay_id (RelayId): The RelayId that defines the Relay which buffer should be checked.

        Returns:
            bool: True if buffer is not existent or empty. False if the buffer has an entry.
        """
        if relay_id in self._relay_buffer:
            return len(self._relay_buffer[relay_id]) == 0
        else:
            return True

    def check_key_in_relay_buffer(self, relay_id, check_key):
        """Checks the buffer of a given Relay if there is a key in one of the Actions.

        First the method checks if the buffer exists for the given relay. If there is a buffer it checks if the buffer
        is empty. Both conditions return False. After that the message checks all messages in the buffer if there is a
        key set in one parameter set as a RelayParameter.
        This method should be called with the method calling pipe.

        Args:
            relay_id (RelayId): Defines the RelayId of the Relay which buffer should be checked.
            check_key (str): Defines the key that should be checked.

        Returns:
            bool: True if there is the given key in one RelayParameter in the buffer of the given Relay. False otherwise
        """
        if relay_id in self._relay_buffer:
            if len(self._relay_buffer[relay_id]) == 0:
                return False
            else:
                for message in self._relay_buffer[relay_id].copy():
                    if isinstance(message, Message) \
                            and message.action.check_has_key_as_parameter(check_key):
                        return True
                    elif isinstance(message, TransmitMessage) \
                            and message.message.action.check_has_key_as_parameter(check_key):
                        return True
                    elif isinstance(message, Action) and message.check_has_key_as_parameter(check_key):
                        return True
                return False
        else:
            return False

    def register_relay_layer(self, layer_id):
        """Registers a RelayLayer and creates a buffer watch process.

        The LinkLayer creates the buffer watch process for the relay layer. It only creates the process if it is not set
        yet.

        Args:
            layer_id (RelayLayerId): Defines the RelayLayerId of the RelayLayer which buffer should be watched.
        """
        # print("registered relay layer {}".format(layer_id))
        if self._layer_process is None:
            # thread = threading.Thread(target=self._watch_relay_layer_buffer, args=(layer_id,), daemon=True)

            # self._layer_process = thread
            # thread.start()layer_id, buffer, node_buffer, running
            process = multiprocessing.Process(target=start_layer_buffer_watch, daemon=True,
                                              args=(layer_id, self._layer_buffer,
                                                    self._relay_layer_buffer,
                                                    self._processes_running))
            self._layer_process = process
            self._layer_process.start()
            # print("WATCH RELAY LAYER BUFFER {} {}".format(layer_id, self._layer_process.pid))
            #
            # self._worker_queue.append(process)
            # print("Register Relay {}".format(process.name))
        else:
            self._logger.warning("Current Link layer already watches a relay layer")

    def shutdown(self):
        """Shutdowns the LinkLayer completely.

        It first shutdowns all Threads and then all subprocesses.
        This method should be called over the method call pipe.
        """
        # self._logger.debug("Shutdown Linklayer {}".format(self._listen_port))

        self._processes_running.value = 0
        self.running = False
        # print("Layer process: {}".format(self._layer_process.pid))
        # print("Shutdown LinkLayer")
        # self._logger.debug("Shutdown relay thread")
        self._relay_buffer_watch_thread.join()
        # print("Terminate LinkLayer layer process")
        self._layer_process.join(1)
        if self._layer_process.exitcode is None:
            self._layer_process.kill()
        # self._logger.debug("Shutdown layer process")
        # print("Terminate LinkLayer listening process")
        if self._listening_process is not None:
            # print("Listening process: {}".format(self._listening_process.pid))
            self._listening_process.join(1)
            # self._logger.debug("EXITCODE OF LISTENING PROCESS: ".format(self._listening_process.exitcode))
            if self._listening_process.exitcode is None:
                self._listening_process.kill()
        # self._logger.debug("Shutdown listening process")
        # print("Shutdown link layer")
        for running in self._relay_running_states.values():
            running = False
        self.stopped = True
        # print("Done Shutdown")
        # self._logger.debug("Done shutdown Linklayer {}".format(self._listen_port))

    def stop_relay_watch(self, relay_id):
        """Stops a watch of a relay buffer.

        The method stops the watch from a relay buffer.
        This method should be called over the method call pipe.

        Args:
            relay_id (RelayId): The RelayId of the Relay which buffer watch should be stopped.
        """
        if relay_id in self._relay_running_states:
            # print("Stopping relay watch", relay_id, self.__threads[relay_id].pid)
            self._relay_running_states[relay_id] = False
            # print(self.__running_states[relay_id].value)

            self._logger.debug("Process stopped for relay {}".format(relay_id))

Methods

def check_key_in_relay_buffer(self, relay_id, check_key)

Checks the buffer of a given Relay if there is a key in one of the Actions.

First the method checks if the buffer exists for the given relay. If there is a buffer it checks if the buffer is empty. Both conditions return False. After that the message checks all messages in the buffer if there is a key set in one parameter set as a RelayParameter. This method should be called with the method calling pipe.

Args

relay_id : RelayId
Defines the RelayId of the Relay which buffer should be checked.
check_key : str
Defines the key that should be checked.

Returns

bool
True if there is the given key in one RelayParameter in the buffer of the given Relay. False otherwise
Expand source code
def check_key_in_relay_buffer(self, relay_id, check_key):
    """Checks the buffer of a given Relay if there is a key in one of the Actions.

    First the method checks if the buffer exists for the given relay. If there is a buffer it checks if the buffer
    is empty. Both conditions return False. After that the message checks all messages in the buffer if there is a
    key set in one parameter set as a RelayParameter.
    This method should be called with the method calling pipe.

    Args:
        relay_id (RelayId): Defines the RelayId of the Relay which buffer should be checked.
        check_key (str): Defines the key that should be checked.

    Returns:
        bool: True if there is the given key in one RelayParameter in the buffer of the given Relay. False otherwise
    """
    if relay_id in self._relay_buffer:
        if len(self._relay_buffer[relay_id]) == 0:
            return False
        else:
            for message in self._relay_buffer[relay_id].copy():
                if isinstance(message, Message) \
                        and message.action.check_has_key_as_parameter(check_key):
                    return True
                elif isinstance(message, TransmitMessage) \
                        and message.message.action.check_has_key_as_parameter(check_key):
                    return True
                elif isinstance(message, Action) and message.check_has_key_as_parameter(check_key):
                    return True
            return False
    else:
        return False
def is_buffer_empty(self, relay_id)

Checks if a buffer for a given relay id defining a relay is empty.

The method checks if there is a buffer for the given relay id and checks if it is empty. This method should be called with the method calling pipe.

Args

relay_id : RelayId
The RelayId that defines the Relay which buffer should be checked.

Returns

bool
True if buffer is not existent or empty. False if the buffer has an entry.
Expand source code
def is_buffer_empty(self, relay_id):
    """Checks if a buffer for a given relay id defining a relay is empty.

    The method checks if there is a buffer for the given relay id and checks if it is empty.
    This method should be called with the method calling pipe.

    Args:
        relay_id (RelayId): The RelayId that defines the Relay which buffer should be checked.

    Returns:
        bool: True if buffer is not existent or empty. False if the buffer has an entry.
    """
    if relay_id in self._relay_buffer:
        return len(self._relay_buffer[relay_id]) == 0
    else:
        return True
def register_relay_layer(self, layer_id)

Registers a RelayLayer and creates a buffer watch process.

The LinkLayer creates the buffer watch process for the relay layer. It only creates the process if it is not set yet.

Args

layer_id : RelayLayerId
Defines the RelayLayerId of the RelayLayer which buffer should be watched.
Expand source code
def register_relay_layer(self, layer_id):
    """Registers a RelayLayer and creates a buffer watch process.

    The LinkLayer creates the buffer watch process for the relay layer. It only creates the process if it is not set
    yet.

    Args:
        layer_id (RelayLayerId): Defines the RelayLayerId of the RelayLayer which buffer should be watched.
    """
    # print("registered relay layer {}".format(layer_id))
    if self._layer_process is None:
        # thread = threading.Thread(target=self._watch_relay_layer_buffer, args=(layer_id,), daemon=True)

        # self._layer_process = thread
        # thread.start()layer_id, buffer, node_buffer, running
        process = multiprocessing.Process(target=start_layer_buffer_watch, daemon=True,
                                          args=(layer_id, self._layer_buffer,
                                                self._relay_layer_buffer,
                                                self._processes_running))
        self._layer_process = process
        self._layer_process.start()
        # print("WATCH RELAY LAYER BUFFER {} {}".format(layer_id, self._layer_process.pid))
        #
        # self._worker_queue.append(process)
        # print("Register Relay {}".format(process.name))
    else:
        self._logger.warning("Current Link layer already watches a relay layer")
def shutdown(self)

Shutdowns the LinkLayer completely.

It first shutdowns all Threads and then all subprocesses. This method should be called over the method call pipe.

Expand source code
def shutdown(self):
    """Shutdowns the LinkLayer completely.

    It first shutdowns all Threads and then all subprocesses.
    This method should be called over the method call pipe.
    """
    # self._logger.debug("Shutdown Linklayer {}".format(self._listen_port))

    self._processes_running.value = 0
    self.running = False
    # print("Layer process: {}".format(self._layer_process.pid))
    # print("Shutdown LinkLayer")
    # self._logger.debug("Shutdown relay thread")
    self._relay_buffer_watch_thread.join()
    # print("Terminate LinkLayer layer process")
    self._layer_process.join(1)
    if self._layer_process.exitcode is None:
        self._layer_process.kill()
    # self._logger.debug("Shutdown layer process")
    # print("Terminate LinkLayer listening process")
    if self._listening_process is not None:
        # print("Listening process: {}".format(self._listening_process.pid))
        self._listening_process.join(1)
        # self._logger.debug("EXITCODE OF LISTENING PROCESS: ".format(self._listening_process.exitcode))
        if self._listening_process.exitcode is None:
            self._listening_process.kill()
    # self._logger.debug("Shutdown listening process")
    # print("Shutdown link layer")
    for running in self._relay_running_states.values():
        running = False
    self.stopped = True
def stop_relay_watch(self, relay_id)

Stops a watch of a relay buffer.

The method stops the watch from a relay buffer. This method should be called over the method call pipe.

Args

relay_id : RelayId
The RelayId of the Relay which buffer watch should be stopped.
Expand source code
def stop_relay_watch(self, relay_id):
    """Stops a watch of a relay buffer.

    The method stops the watch from a relay buffer.
    This method should be called over the method call pipe.

    Args:
        relay_id (RelayId): The RelayId of the Relay which buffer watch should be stopped.
    """
    if relay_id in self._relay_running_states:
        # print("Stopping relay watch", relay_id, self.__threads[relay_id].pid)
        self._relay_running_states[relay_id] = False
        # print(self.__running_states[relay_id].value)

        self._logger.debug("Process stopped for relay {}".format(relay_id))