Module RelayModel.LinkLayer
Expand source code
import collections
import multiprocessing
import queue
import threading
import time
import zmq
from RelayModel import RelayLogging, ModuleConfig
from RelayModel.Communication import SuccessMessage, FailureMessage, LayerMessage, TransmitMessage, Action, \
ProbeAction, ProbeFailAction, NotAuthorizedAction, PingAction, OutRelayClosedAction, InRelayClosedAction, Message
from RelayModel.Relay import Relay
from RelayModel.RelayId import RelayLayerId
def send_message(context, layer_id, message):
"""Used to send a message to a specific link layer.
This message can be used to send a message to another link layer using the lazy pirate pattern.
It creates a socket in the given context and tries to send the given message to the given layer id.
Args:
context (zmq.Context): Defines the context where the socket should created in.
layer_id (RelayLayerId): Defines the RID of the LinkLayer that needs to receive the message.
message (Object): The message that should be send. Normally this should be a TransmitMessage object
or a LayerMessage. Otherwise the other LinkLayer will deny the message. But in general every object can be
sent over the socket.
Returns:
bool: True if the message got accepted or is received by the link layer, False if the Message was denied or
could not be sent.
"""
# Send normally over sockets
socket = context.socket(zmq.REQ)
end_point = "tcp://" + str(layer_id.ip) + ":" + str(layer_id.port)
socket.connect(end_point)
socket.send_pyobj(message)
retries = ModuleConfig.POLL_TRIES
while True:
if (socket.poll(ModuleConfig.POLL_TIMEOUT) & zmq.POLLIN) != 0:
reply = socket.recv_pyobj()
socket.setsockopt(zmq.LINGER, 0)
socket.close()
if isinstance(reply, FailureMessage):
return False
else:
return True
retries -= 1
socket.setsockopt(zmq.LINGER, 0)
socket.close()
if retries == 0:
return False
socket = context.socket(zmq.REQ)
socket.connect(end_point)
socket.send_pyobj(message)
def start_link_layer(link_layer_buffer, relay_layer_buffer, node_buffer, listen_ip, listen_port, pipe):
"""Creates a link layer and waits until it got stopped.
This function should be called with a new process.
Example:
A basic example is given here. It creates a Process and starts it afterwards
process = multiprocessing.Process(target=start_link_layer, args=(link_layer_buffer, message_buffer, node_queue, layer_id.ip, layer_id.port, link_layer_pipe))
process.start()
Args:
All arguments are the same as the LinkLayer constructor arguments.
For further information see `LinkLayer` class docs.
"""
link_layer = LinkLayer(link_layer_buffer, relay_layer_buffer, node_buffer, listen_ip, listen_port, pipe)
while not link_layer.stopped:
time.sleep(0.1)
# link_layer._logger.debug("End funktion start link layer")
def start_layer_buffer_watch(layer_id, buffer, relay_layer_buffer, running):
"""Starts a watch over a RelayLayer buffer.
It watches a given buffer and if there occurs a message it tries to send it to the right endpoint.
This method should be called in a seperated process.
The function runs as long the running state is equal to 1 and tries to get a message from the buffer.
Args:
layer_id (RelayLayerId): Defines the layer id of the relay layer that registered in this function. This is only
needed for creating a logger file.
buffer (multiprocessing.Queue): Defines the buffer that should be watched as a multiprocessing queue.
relay_layer_buffer (multiprocessing.Queue): Defines the message buffer of a RelayLayer. This buffer is needed
to send messages to the same RelayLayer as registered.
running (multiprocessing.Value): Defines the running state of the function. If this value is set to 0 the
function stops. This is used to stop the function within a process from a parent process.
"""
logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL,
"link_layer_watch_layer_buffer_{}_{}".format(layer_id.ip, layer_id.port))
logger.debug("Start relay layer buffer watch for {}".format(layer_id))
context = zmq.Context()
while running.value == 1:
try:
message = buffer.get(True, 0.01)
except queue.Empty:
time.sleep(0.1)
else:
logger.debug("Layer buffer size: {}".format(buffer.qsize()))
if isinstance(message, LayerMessage):
if message.layer_id == layer_id:
# logger.debug("Message should be sent to own layer. Send to node!\n")
action = message.action
# check if action is an internal action
if isinstance(action, ProbeAction) or isinstance(action, ProbeFailAction) \
or isinstance(action, NotAuthorizedAction) or isinstance(action, PingAction) \
or isinstance(action, InRelayClosedAction) or isinstance(action, OutRelayClosedAction):
try:
relay_layer_buffer.put_nowait(message)
except BrokenPipeError:
print("BrokenPipeError in RelayLayerWatch first")
else:
logger.warning("Message with action {} is not an internal action and will not be "
"processed".format(action))
else:
result = send_message(context, message.layer_id, message)
if not result:
if ModuleConfig.CONSIDER_AS_CLOSED:
if isinstance(message.action, PingAction):
for ping in message.action.pings:
in_relay_closed_action = InRelayClosedAction([ping.key], message.layer_id,
ping.relay_id)
layer_message = LayerMessage(message.layer_id, in_relay_closed_action)
try:
relay_layer_buffer.put_nowait(layer_message)
except BrokenPipeError:
print("BrokenPipeError in RelayLAyerwatch second")
logger.warning("Message could not be transmitted to layer {}".format(message))
else:
logger.warning("Message is not in right format\n")
# logger.debug("End watching layer")
def start_listening(listen_ip, listen_port, relay_layer_buffer, running):
"""Starts listening on a specific ip and port and can be used to receive messages on this address.
The function creates a listening socket on the given address and tries, as long the running state is 1, to receive
messages from it. When a message is received it will get checked if it is a LayerMessage or a TransmitMessage.
If the format is correct it sends a `SuccessMessage` back to the sending LinkLayer. Otherwise it sends a
`FailureMessage` back. If the message is a valid Message the message will be provided to the RelayLayer of this
LinkLayer. This is done by inserting the message in the given RelayLayer message buffer.
Args:
listen_ip (str): The ip the socket should be listen on. This is only used for the logger file name. The socket
normally listens on every interface on a specific port.
listen_port (int): Defines the port the socket should listen on.
relay_layer_buffer (multiprocessing.Queue): Provides the message buffer of a RelayLayer. It is used to send
received messages to the RelayLayer so it can process the message.
running (multiprocessing.Value): Defines the running state of the function. If this value is set to 0 the
function stops. This is used to stop the function within a process from a parent process.
"""
context = zmq.Context()
listening_socket = context.socket(zmq.REP)
endpoint_string = "tcp://*:" + str(listen_port)
listening_socket.bind(endpoint_string)
logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL,
'link_layer_listening_socket_{}_{}'.format(listen_ip, listen_port))
logger.debug(f"Start listening thread for ip: {listen_ip} and port: {listen_port}")
while running.value == 1:
if (listening_socket.poll(ModuleConfig.POLL_TIMEOUT) & zmq.POLLIN) != 0:
message = listening_socket.recv_pyobj()
logger.debug(f"{listen_ip}:{listen_port}: Message received: {message}")
if isinstance(message, LayerMessage) or isinstance(message, TransmitMessage):
return_message = SuccessMessage("Success")
listening_socket.send_pyobj(return_message)
try:
relay_layer_buffer.put_nowait(message)
except BrokenPipeError:
print("Broken Pipe Error in listening thread")
else:
return_message = FailureMessage("Failure")
listening_socket.send_pyobj(return_message)
logger.debug(f"Stopped listening thread for ip: {listen_ip} and port: {listen_port}")
class LinkLayer:
"""The LinkLayer class provides the functionality to send and receive messages to other nodes.
The LinkLayer watches message buffers from the RelayLayer and Relays and sends every message to the right endpoint.
For further information about the functionality see the thesis of this library.
Attributes:
running (bool): Defines the running state of the LinkLayer. If this is False all Thread should be stopped.
stopped (bool): Defines the state if the LinkLayer is stopped. It is only set to True if the LinkLayer
completely stopped all threads and processes.
"""
def __init__(self, link_layer_buffer, relay_layer_buffer, node_buffer, listen_ip, listen_port, pipe):
"""Creates a LinkLayer object.
Which threads and processes the LinkLayer starts is described in the thesis of this library.
Args:
link_layer_buffer (multiprocessing.Queue): Defines the buffer where messages can be inserted. This messages
will be inserted from the LinkLayer in the Relay or RelayLayer buffers. Only tuples should be inserted.
The first entry of the tuple should either be a Relay object or a RelayLayerId. This depends on the
buffer the message in the second entry of the tuple should be inserted into.
relay_layer_buffer (multiprocessing.Queue): Defines the RelayLayer message queue. This buffer is used to
send messages to the RelayLayer that should be processed by it.
node_buffer (multiprocessing.Queue): Defines the Node message queue. This buffer is use to send messages to
the underlying node. This messages are mostly `Action` messages.
listen_ip (str): Defines the ip the LinkLayer should be reachable on.
listen_port (int): Defines the port the LinkLayer should be reachable on.
pipe (multiprocessing.Pipe): Defines a Pipe that is used to call methods in this LinkLayer from another
Process. There should only occur Triples of data where the first entry defines the method name. The
second entry defines if there is a response needed as a bool and the third entry is a list of arguments
needed to execute the method. E.g. ('is_buffer_empty', True, [relay_id])
"""
self._buffer = link_layer_buffer
self._node_buffer = node_buffer
self._relay_layer_buffer = relay_layer_buffer
self._listen_ip = listen_ip
self._listen_port = listen_port
self._relay_running_states = {}
self.running = True
self.stopped = False
self._processes_running = multiprocessing.Value("i", 1)
self._listening_process = None
self._layer_buffer = multiprocessing.Queue()
self._layer_process = None
self._relay_buffer = {}
self._context = zmq.Context()
self._method_call_pipe = pipe
self._logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL,
"link_layer_{}_{}".format(listen_ip, listen_port))
self._worker_thread = threading.Thread(target=self._start_worker_thread, daemon=True)
self._worker_thread.start()
self._method_calling_thread = threading.Thread(target=self._start_method_calling_thread, daemon=True)
self._method_calling_thread.start()
self._relay_buffer_watch_thread = threading.Thread(target=self._watch_relay_buffers, daemon=True)
self._relay_buffer_watch_thread.start()
self._init_socket()
def _start_worker_thread(self):
while self.running:
try:
information = self._buffer.get(True, 0.01)
self._logger.debug("Link layer buffer size: {}".format(self._buffer.qsize()))
except queue.Empty:
time.sleep(0.01)
else:
key = information[0]
message = information[1]
# self._logger.debug("Got message {}".format(information))
if isinstance(key, RelayLayerId):
self._layer_buffer.put_nowait(message)
elif isinstance(key, Relay):
if key.relay_id in self._relay_buffer:
self._relay_buffer[key.relay_id][0] = key
self._relay_buffer[key.relay_id][1].append(message)
elif key.relay_id not in self._relay_running_states:
self._relay_running_states[key.relay_id] = True
self._relay_buffer[key.relay_id] = [key, collections.deque()]
self._relay_buffer[key.relay_id][1].append(message)
# self._logger.warning("Stopped Worker Thread")
def _watch_relay_buffers(self):
self._logger.debug("Start relay buffer watches")
while self.running:
for buffer_list in self._relay_buffer.copy().values():
relay = buffer_list[0]
relay_id = relay.relay_id
buffer = buffer_list[1]
if len(buffer) > 0:
send_messages = []
self._logger.debug("Relay buffer size for relay {}: {}".format(relay_id, len(buffer)))
while len(buffer) > 0:
send_messages.append(buffer.popleft())
out_id = relay.out_relay.out_id
for message in send_messages:
if not self.running:
break
# logger.debug("Message received: {}".format(message))
if isinstance(message, TransmitMessage):
if out_id is not None:
# logger.debug(f"Message is TransmitMessage and should be transmitted to a relay layer with id "
# f"{out_id}\n")
if out_id.layer_id.ip == relay_id.layer_id.ip and out_id.layer_id.port == relay_id.layer_id.port:
# logger.debug(f"Receiving message: {message}\n")
message.receiving_relay = relay_id
self._relay_layer_buffer.put_nowait(message)
# else:
# logger.error(
# "Not supported message type {} was received by layer".format(message.__class__))
else:
result = send_message(self._context, out_id.layer_id, message)
if not result:
if ModuleConfig.CONSIDER_AS_CLOSED:
out_relay_closed = OutRelayClosedAction(out_id)
layer_message = LayerMessage(relay_id.layer_id, out_relay_closed)
self._relay_layer_buffer.put_nowait(layer_message)
self._logger.error("Message discarded: {}".format(message))
else:
message.receiving_relay = relay_id
self._relay_layer_buffer.put_nowait(message)
elif isinstance(message, Action):
# logger.debug("Action provided")
message.receiving_relay = relay_id
self._node_buffer.put_nowait(message)
else:
self._logger.debug("Message is not in right format")
else:
if not self._relay_running_states[relay_id]:
self._relay_buffer.pop(relay_id)
time.sleep(0.01)
# print("Stopping relay watch end process")
# self._logger.warning("Stopped watching relay buffers")
def _start_method_calling_thread(self):
self._logger.debug("Start method calling thread")
while self.running:
try:
msg = self._method_call_pipe.recv()
start_time = time.time()
method = msg[0]
needs_response = msg[1]
try:
args = list(msg[2])
except TypeError:
print(method)
raise TypeError
# self._logger.debug("Should call {}".format(method))
action_function = getattr(self, method)
if callable(action_function):
result = action_function(*args)
# self._logger.debug(f"Called {method} got {result}")
if needs_response:
self._method_call_pipe.send(result)
except EOFError:
self._logger.error("EOF ERROR")
break
except BrokenPipeError:
self._logger.error("Broken Pipe ERROR")
break
# self._logger.warning("Stopped Method Calling Thread")
def _init_socket(self):
process = multiprocessing.Process(target=start_listening, daemon=True, args=(self._listen_ip, self._listen_port,
self._relay_layer_buffer,
self._processes_running))
self._listening_process = process
self._listening_process.start()
# print("LINK LAYER START LISTENING {} {}".format(self._listen_port, self._listening_process.pid))
def is_buffer_empty(self, relay_id):
"""Checks if a buffer for a given relay id defining a relay is empty.
The method checks if there is a buffer for the given relay id and checks if it is empty.
This method should be called with the method calling pipe.
Args:
relay_id (RelayId): The RelayId that defines the Relay which buffer should be checked.
Returns:
bool: True if buffer is not existent or empty. False if the buffer has an entry.
"""
if relay_id in self._relay_buffer:
return len(self._relay_buffer[relay_id]) == 0
else:
return True
def check_key_in_relay_buffer(self, relay_id, check_key):
"""Checks the buffer of a given Relay if there is a key in one of the Actions.
First the method checks if the buffer exists for the given relay. If there is a buffer it checks if the buffer
is empty. Both conditions return False. After that the message checks all messages in the buffer if there is a
key set in one parameter set as a RelayParameter.
This method should be called with the method calling pipe.
Args:
relay_id (RelayId): Defines the RelayId of the Relay which buffer should be checked.
check_key (str): Defines the key that should be checked.
Returns:
bool: True if there is the given key in one RelayParameter in the buffer of the given Relay. False otherwise
"""
if relay_id in self._relay_buffer:
if len(self._relay_buffer[relay_id]) == 0:
return False
else:
for message in self._relay_buffer[relay_id].copy():
if isinstance(message, Message) \
and message.action.check_has_key_as_parameter(check_key):
return True
elif isinstance(message, TransmitMessage) \
and message.message.action.check_has_key_as_parameter(check_key):
return True
elif isinstance(message, Action) and message.check_has_key_as_parameter(check_key):
return True
return False
else:
return False
def register_relay_layer(self, layer_id):
"""Registers a RelayLayer and creates a buffer watch process.
The LinkLayer creates the buffer watch process for the relay layer. It only creates the process if it is not set
yet.
Args:
layer_id (RelayLayerId): Defines the RelayLayerId of the RelayLayer which buffer should be watched.
"""
# print("registered relay layer {}".format(layer_id))
if self._layer_process is None:
# thread = threading.Thread(target=self._watch_relay_layer_buffer, args=(layer_id,), daemon=True)
# self._layer_process = thread
# thread.start()layer_id, buffer, node_buffer, running
process = multiprocessing.Process(target=start_layer_buffer_watch, daemon=True,
args=(layer_id, self._layer_buffer,
self._relay_layer_buffer,
self._processes_running))
self._layer_process = process
self._layer_process.start()
# print("WATCH RELAY LAYER BUFFER {} {}".format(layer_id, self._layer_process.pid))
#
# self._worker_queue.append(process)
# print("Register Relay {}".format(process.name))
else:
self._logger.warning("Current Link layer already watches a relay layer")
def shutdown(self):
"""Shutdowns the LinkLayer completely.
It first shutdowns all Threads and then all subprocesses.
This method should be called over the method call pipe.
"""
# self._logger.debug("Shutdown Linklayer {}".format(self._listen_port))
self._processes_running.value = 0
self.running = False
# print("Layer process: {}".format(self._layer_process.pid))
# print("Shutdown LinkLayer")
# self._logger.debug("Shutdown relay thread")
self._relay_buffer_watch_thread.join()
# print("Terminate LinkLayer layer process")
self._layer_process.join(1)
if self._layer_process.exitcode is None:
self._layer_process.kill()
# self._logger.debug("Shutdown layer process")
# print("Terminate LinkLayer listening process")
if self._listening_process is not None:
# print("Listening process: {}".format(self._listening_process.pid))
self._listening_process.join(1)
# self._logger.debug("EXITCODE OF LISTENING PROCESS: ".format(self._listening_process.exitcode))
if self._listening_process.exitcode is None:
self._listening_process.kill()
# self._logger.debug("Shutdown listening process")
# print("Shutdown link layer")
for running in self._relay_running_states.values():
running = False
self.stopped = True
# print("Done Shutdown")
# self._logger.debug("Done shutdown Linklayer {}".format(self._listen_port))
def stop_relay_watch(self, relay_id):
"""Stops a watch of a relay buffer.
The method stops the watch from a relay buffer.
This method should be called over the method call pipe.
Args:
relay_id (RelayId): The RelayId of the Relay which buffer watch should be stopped.
"""
if relay_id in self._relay_running_states:
# print("Stopping relay watch", relay_id, self.__threads[relay_id].pid)
self._relay_running_states[relay_id] = False
# print(self.__running_states[relay_id].value)
self._logger.debug("Process stopped for relay {}".format(relay_id))
Functions
def send_message(context, layer_id, message)-
Used to send a message to a specific link layer.
This message can be used to send a message to another link layer using the lazy pirate pattern. It creates a socket in the given context and tries to send the given message to the given layer id.
Args
context:zmq.Context- Defines the context where the socket should created in.
layer_id:RelayLayerId- Defines the RID of the LinkLayer that needs to receive the message.
message:Object- The message that should be send. Normally this should be a TransmitMessage object or a LayerMessage. Otherwise the other LinkLayer will deny the message. But in general every object can be sent over the socket.
Returns
bool- True if the message got accepted or is received by the link layer, False if the Message was denied or could not be sent.
Expand source code
def send_message(context, layer_id, message): """Used to send a message to a specific link layer. This message can be used to send a message to another link layer using the lazy pirate pattern. It creates a socket in the given context and tries to send the given message to the given layer id. Args: context (zmq.Context): Defines the context where the socket should created in. layer_id (RelayLayerId): Defines the RID of the LinkLayer that needs to receive the message. message (Object): The message that should be send. Normally this should be a TransmitMessage object or a LayerMessage. Otherwise the other LinkLayer will deny the message. But in general every object can be sent over the socket. Returns: bool: True if the message got accepted or is received by the link layer, False if the Message was denied or could not be sent. """ # Send normally over sockets socket = context.socket(zmq.REQ) end_point = "tcp://" + str(layer_id.ip) + ":" + str(layer_id.port) socket.connect(end_point) socket.send_pyobj(message) retries = ModuleConfig.POLL_TRIES while True: if (socket.poll(ModuleConfig.POLL_TIMEOUT) & zmq.POLLIN) != 0: reply = socket.recv_pyobj() socket.setsockopt(zmq.LINGER, 0) socket.close() if isinstance(reply, FailureMessage): return False else: return True retries -= 1 socket.setsockopt(zmq.LINGER, 0) socket.close() if retries == 0: return False socket = context.socket(zmq.REQ) socket.connect(end_point) socket.send_pyobj(message) def start_layer_buffer_watch(layer_id, buffer, relay_layer_buffer, running)-
Starts a watch over a RelayLayer buffer.
It watches a given buffer and if there occurs a message it tries to send it to the right endpoint. This method should be called in a seperated process. The function runs as long the running state is equal to 1 and tries to get a message from the buffer.
Args
layer_id:RelayLayerId- Defines the layer id of the relay layer that registered in this function. This is only needed for creating a logger file.
buffer:multiprocessing.Queue- Defines the buffer that should be watched as a multiprocessing queue.
relay_layer_buffer:multiprocessing.Queue- Defines the message buffer of a RelayLayer. This buffer is needed to send messages to the same RelayLayer as registered.
running:multiprocessing.Value- Defines the running state of the function. If this value is set to 0 the function stops. This is used to stop the function within a process from a parent process.
Expand source code
def start_layer_buffer_watch(layer_id, buffer, relay_layer_buffer, running): """Starts a watch over a RelayLayer buffer. It watches a given buffer and if there occurs a message it tries to send it to the right endpoint. This method should be called in a seperated process. The function runs as long the running state is equal to 1 and tries to get a message from the buffer. Args: layer_id (RelayLayerId): Defines the layer id of the relay layer that registered in this function. This is only needed for creating a logger file. buffer (multiprocessing.Queue): Defines the buffer that should be watched as a multiprocessing queue. relay_layer_buffer (multiprocessing.Queue): Defines the message buffer of a RelayLayer. This buffer is needed to send messages to the same RelayLayer as registered. running (multiprocessing.Value): Defines the running state of the function. If this value is set to 0 the function stops. This is used to stop the function within a process from a parent process. """ logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL, "link_layer_watch_layer_buffer_{}_{}".format(layer_id.ip, layer_id.port)) logger.debug("Start relay layer buffer watch for {}".format(layer_id)) context = zmq.Context() while running.value == 1: try: message = buffer.get(True, 0.01) except queue.Empty: time.sleep(0.1) else: logger.debug("Layer buffer size: {}".format(buffer.qsize())) if isinstance(message, LayerMessage): if message.layer_id == layer_id: # logger.debug("Message should be sent to own layer. Send to node!\n") action = message.action # check if action is an internal action if isinstance(action, ProbeAction) or isinstance(action, ProbeFailAction) \ or isinstance(action, NotAuthorizedAction) or isinstance(action, PingAction) \ or isinstance(action, InRelayClosedAction) or isinstance(action, OutRelayClosedAction): try: relay_layer_buffer.put_nowait(message) except BrokenPipeError: print("BrokenPipeError in RelayLayerWatch first") else: logger.warning("Message with action {} is not an internal action and will not be " "processed".format(action)) else: result = send_message(context, message.layer_id, message) if not result: if ModuleConfig.CONSIDER_AS_CLOSED: if isinstance(message.action, PingAction): for ping in message.action.pings: in_relay_closed_action = InRelayClosedAction([ping.key], message.layer_id, ping.relay_id) layer_message = LayerMessage(message.layer_id, in_relay_closed_action) try: relay_layer_buffer.put_nowait(layer_message) except BrokenPipeError: print("BrokenPipeError in RelayLAyerwatch second") logger.warning("Message could not be transmitted to layer {}".format(message)) else: logger.warning("Message is not in right format\n") def start_link_layer(link_layer_buffer, relay_layer_buffer, node_buffer, listen_ip, listen_port, pipe)-
Creates a link layer and waits until it got stopped.
This function should be called with a new process.
Example
A basic example is given here. It creates a Process and starts it afterwards
process = multiprocessing.Process(target=start_link_layer, args=(link_layer_buffer, message_buffer, node_queue, layer_id.ip, layer_id.port, link_layer_pipe)) process.start()Args
All arguments are the same as the LinkLayer constructor arguments. For further information see
LinkLayerclass docs.Expand source code
def start_link_layer(link_layer_buffer, relay_layer_buffer, node_buffer, listen_ip, listen_port, pipe): """Creates a link layer and waits until it got stopped. This function should be called with a new process. Example: A basic example is given here. It creates a Process and starts it afterwards process = multiprocessing.Process(target=start_link_layer, args=(link_layer_buffer, message_buffer, node_queue, layer_id.ip, layer_id.port, link_layer_pipe)) process.start() Args: All arguments are the same as the LinkLayer constructor arguments. For further information see `LinkLayer` class docs. """ link_layer = LinkLayer(link_layer_buffer, relay_layer_buffer, node_buffer, listen_ip, listen_port, pipe) while not link_layer.stopped: time.sleep(0.1) def start_listening(listen_ip, listen_port, relay_layer_buffer, running)-
Starts listening on a specific ip and port and can be used to receive messages on this address.
The function creates a listening socket on the given address and tries, as long the running state is 1, to receive messages from it. When a message is received it will get checked if it is a LayerMessage or a TransmitMessage. If the format is correct it sends a
SuccessMessageback to the sending LinkLayer. Otherwise it sends aFailureMessageback. If the message is a valid Message the message will be provided to the RelayLayer of this LinkLayer. This is done by inserting the message in the given RelayLayer message buffer.Args
listen_ip:str- The ip the socket should be listen on. This is only used for the logger file name. The socket normally listens on every interface on a specific port.
listen_port:int- Defines the port the socket should listen on.
relay_layer_buffer:multiprocessing.Queue- Provides the message buffer of a RelayLayer. It is used to send received messages to the RelayLayer so it can process the message.
running:multiprocessing.Value- Defines the running state of the function. If this value is set to 0 the function stops. This is used to stop the function within a process from a parent process.
Expand source code
def start_listening(listen_ip, listen_port, relay_layer_buffer, running): """Starts listening on a specific ip and port and can be used to receive messages on this address. The function creates a listening socket on the given address and tries, as long the running state is 1, to receive messages from it. When a message is received it will get checked if it is a LayerMessage or a TransmitMessage. If the format is correct it sends a `SuccessMessage` back to the sending LinkLayer. Otherwise it sends a `FailureMessage` back. If the message is a valid Message the message will be provided to the RelayLayer of this LinkLayer. This is done by inserting the message in the given RelayLayer message buffer. Args: listen_ip (str): The ip the socket should be listen on. This is only used for the logger file name. The socket normally listens on every interface on a specific port. listen_port (int): Defines the port the socket should listen on. relay_layer_buffer (multiprocessing.Queue): Provides the message buffer of a RelayLayer. It is used to send received messages to the RelayLayer so it can process the message. running (multiprocessing.Value): Defines the running state of the function. If this value is set to 0 the function stops. This is used to stop the function within a process from a parent process. """ context = zmq.Context() listening_socket = context.socket(zmq.REP) endpoint_string = "tcp://*:" + str(listen_port) listening_socket.bind(endpoint_string) logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL, 'link_layer_listening_socket_{}_{}'.format(listen_ip, listen_port)) logger.debug(f"Start listening thread for ip: {listen_ip} and port: {listen_port}") while running.value == 1: if (listening_socket.poll(ModuleConfig.POLL_TIMEOUT) & zmq.POLLIN) != 0: message = listening_socket.recv_pyobj() logger.debug(f"{listen_ip}:{listen_port}: Message received: {message}") if isinstance(message, LayerMessage) or isinstance(message, TransmitMessage): return_message = SuccessMessage("Success") listening_socket.send_pyobj(return_message) try: relay_layer_buffer.put_nowait(message) except BrokenPipeError: print("Broken Pipe Error in listening thread") else: return_message = FailureMessage("Failure") listening_socket.send_pyobj(return_message) logger.debug(f"Stopped listening thread for ip: {listen_ip} and port: {listen_port}")
Classes
class LinkLayer (link_layer_buffer, relay_layer_buffer, node_buffer, listen_ip, listen_port, pipe)-
The LinkLayer class provides the functionality to send and receive messages to other nodes.
The LinkLayer watches message buffers from the RelayLayer and Relays and sends every message to the right endpoint. For further information about the functionality see the thesis of this library.
Attributes
running:bool- Defines the running state of the LinkLayer. If this is False all Thread should be stopped.
stopped:bool- Defines the state if the LinkLayer is stopped. It is only set to True if the LinkLayer completely stopped all threads and processes.
Creates a LinkLayer object.
Which threads and processes the LinkLayer starts is described in the thesis of this library.
Args
link_layer_buffer:multiprocessing.Queue- Defines the buffer where messages can be inserted. This messages will be inserted from the LinkLayer in the Relay or RelayLayer buffers. Only tuples should be inserted. The first entry of the tuple should either be a Relay object or a RelayLayerId. This depends on the buffer the message in the second entry of the tuple should be inserted into.
relay_layer_buffer:multiprocessing.Queue- Defines the RelayLayer message queue. This buffer is used to send messages to the RelayLayer that should be processed by it.
node_buffer:multiprocessing.Queue- Defines the Node message queue. This buffer is use to send messages to
the underlying node. This messages are mostly
Actionmessages. listen_ip:str- Defines the ip the LinkLayer should be reachable on.
listen_port:int- Defines the port the LinkLayer should be reachable on.
pipe:multiprocessing.Pipe- Defines a Pipe that is used to call methods in this LinkLayer from another Process. There should only occur Triples of data where the first entry defines the method name. The second entry defines if there is a response needed as a bool and the third entry is a list of arguments needed to execute the method. E.g. ('is_buffer_empty', True, [relay_id])
Expand source code
class LinkLayer: """The LinkLayer class provides the functionality to send and receive messages to other nodes. The LinkLayer watches message buffers from the RelayLayer and Relays and sends every message to the right endpoint. For further information about the functionality see the thesis of this library. Attributes: running (bool): Defines the running state of the LinkLayer. If this is False all Thread should be stopped. stopped (bool): Defines the state if the LinkLayer is stopped. It is only set to True if the LinkLayer completely stopped all threads and processes. """ def __init__(self, link_layer_buffer, relay_layer_buffer, node_buffer, listen_ip, listen_port, pipe): """Creates a LinkLayer object. Which threads and processes the LinkLayer starts is described in the thesis of this library. Args: link_layer_buffer (multiprocessing.Queue): Defines the buffer where messages can be inserted. This messages will be inserted from the LinkLayer in the Relay or RelayLayer buffers. Only tuples should be inserted. The first entry of the tuple should either be a Relay object or a RelayLayerId. This depends on the buffer the message in the second entry of the tuple should be inserted into. relay_layer_buffer (multiprocessing.Queue): Defines the RelayLayer message queue. This buffer is used to send messages to the RelayLayer that should be processed by it. node_buffer (multiprocessing.Queue): Defines the Node message queue. This buffer is use to send messages to the underlying node. This messages are mostly `Action` messages. listen_ip (str): Defines the ip the LinkLayer should be reachable on. listen_port (int): Defines the port the LinkLayer should be reachable on. pipe (multiprocessing.Pipe): Defines a Pipe that is used to call methods in this LinkLayer from another Process. There should only occur Triples of data where the first entry defines the method name. The second entry defines if there is a response needed as a bool and the third entry is a list of arguments needed to execute the method. E.g. ('is_buffer_empty', True, [relay_id]) """ self._buffer = link_layer_buffer self._node_buffer = node_buffer self._relay_layer_buffer = relay_layer_buffer self._listen_ip = listen_ip self._listen_port = listen_port self._relay_running_states = {} self.running = True self.stopped = False self._processes_running = multiprocessing.Value("i", 1) self._listening_process = None self._layer_buffer = multiprocessing.Queue() self._layer_process = None self._relay_buffer = {} self._context = zmq.Context() self._method_call_pipe = pipe self._logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL, "link_layer_{}_{}".format(listen_ip, listen_port)) self._worker_thread = threading.Thread(target=self._start_worker_thread, daemon=True) self._worker_thread.start() self._method_calling_thread = threading.Thread(target=self._start_method_calling_thread, daemon=True) self._method_calling_thread.start() self._relay_buffer_watch_thread = threading.Thread(target=self._watch_relay_buffers, daemon=True) self._relay_buffer_watch_thread.start() self._init_socket() def _start_worker_thread(self): while self.running: try: information = self._buffer.get(True, 0.01) self._logger.debug("Link layer buffer size: {}".format(self._buffer.qsize())) except queue.Empty: time.sleep(0.01) else: key = information[0] message = information[1] # self._logger.debug("Got message {}".format(information)) if isinstance(key, RelayLayerId): self._layer_buffer.put_nowait(message) elif isinstance(key, Relay): if key.relay_id in self._relay_buffer: self._relay_buffer[key.relay_id][0] = key self._relay_buffer[key.relay_id][1].append(message) elif key.relay_id not in self._relay_running_states: self._relay_running_states[key.relay_id] = True self._relay_buffer[key.relay_id] = [key, collections.deque()] self._relay_buffer[key.relay_id][1].append(message) # self._logger.warning("Stopped Worker Thread") def _watch_relay_buffers(self): self._logger.debug("Start relay buffer watches") while self.running: for buffer_list in self._relay_buffer.copy().values(): relay = buffer_list[0] relay_id = relay.relay_id buffer = buffer_list[1] if len(buffer) > 0: send_messages = [] self._logger.debug("Relay buffer size for relay {}: {}".format(relay_id, len(buffer))) while len(buffer) > 0: send_messages.append(buffer.popleft()) out_id = relay.out_relay.out_id for message in send_messages: if not self.running: break # logger.debug("Message received: {}".format(message)) if isinstance(message, TransmitMessage): if out_id is not None: # logger.debug(f"Message is TransmitMessage and should be transmitted to a relay layer with id " # f"{out_id}\n") if out_id.layer_id.ip == relay_id.layer_id.ip and out_id.layer_id.port == relay_id.layer_id.port: # logger.debug(f"Receiving message: {message}\n") message.receiving_relay = relay_id self._relay_layer_buffer.put_nowait(message) # else: # logger.error( # "Not supported message type {} was received by layer".format(message.__class__)) else: result = send_message(self._context, out_id.layer_id, message) if not result: if ModuleConfig.CONSIDER_AS_CLOSED: out_relay_closed = OutRelayClosedAction(out_id) layer_message = LayerMessage(relay_id.layer_id, out_relay_closed) self._relay_layer_buffer.put_nowait(layer_message) self._logger.error("Message discarded: {}".format(message)) else: message.receiving_relay = relay_id self._relay_layer_buffer.put_nowait(message) elif isinstance(message, Action): # logger.debug("Action provided") message.receiving_relay = relay_id self._node_buffer.put_nowait(message) else: self._logger.debug("Message is not in right format") else: if not self._relay_running_states[relay_id]: self._relay_buffer.pop(relay_id) time.sleep(0.01) # print("Stopping relay watch end process") # self._logger.warning("Stopped watching relay buffers") def _start_method_calling_thread(self): self._logger.debug("Start method calling thread") while self.running: try: msg = self._method_call_pipe.recv() start_time = time.time() method = msg[0] needs_response = msg[1] try: args = list(msg[2]) except TypeError: print(method) raise TypeError # self._logger.debug("Should call {}".format(method)) action_function = getattr(self, method) if callable(action_function): result = action_function(*args) # self._logger.debug(f"Called {method} got {result}") if needs_response: self._method_call_pipe.send(result) except EOFError: self._logger.error("EOF ERROR") break except BrokenPipeError: self._logger.error("Broken Pipe ERROR") break # self._logger.warning("Stopped Method Calling Thread") def _init_socket(self): process = multiprocessing.Process(target=start_listening, daemon=True, args=(self._listen_ip, self._listen_port, self._relay_layer_buffer, self._processes_running)) self._listening_process = process self._listening_process.start() # print("LINK LAYER START LISTENING {} {}".format(self._listen_port, self._listening_process.pid)) def is_buffer_empty(self, relay_id): """Checks if a buffer for a given relay id defining a relay is empty. The method checks if there is a buffer for the given relay id and checks if it is empty. This method should be called with the method calling pipe. Args: relay_id (RelayId): The RelayId that defines the Relay which buffer should be checked. Returns: bool: True if buffer is not existent or empty. False if the buffer has an entry. """ if relay_id in self._relay_buffer: return len(self._relay_buffer[relay_id]) == 0 else: return True def check_key_in_relay_buffer(self, relay_id, check_key): """Checks the buffer of a given Relay if there is a key in one of the Actions. First the method checks if the buffer exists for the given relay. If there is a buffer it checks if the buffer is empty. Both conditions return False. After that the message checks all messages in the buffer if there is a key set in one parameter set as a RelayParameter. This method should be called with the method calling pipe. Args: relay_id (RelayId): Defines the RelayId of the Relay which buffer should be checked. check_key (str): Defines the key that should be checked. Returns: bool: True if there is the given key in one RelayParameter in the buffer of the given Relay. False otherwise """ if relay_id in self._relay_buffer: if len(self._relay_buffer[relay_id]) == 0: return False else: for message in self._relay_buffer[relay_id].copy(): if isinstance(message, Message) \ and message.action.check_has_key_as_parameter(check_key): return True elif isinstance(message, TransmitMessage) \ and message.message.action.check_has_key_as_parameter(check_key): return True elif isinstance(message, Action) and message.check_has_key_as_parameter(check_key): return True return False else: return False def register_relay_layer(self, layer_id): """Registers a RelayLayer and creates a buffer watch process. The LinkLayer creates the buffer watch process for the relay layer. It only creates the process if it is not set yet. Args: layer_id (RelayLayerId): Defines the RelayLayerId of the RelayLayer which buffer should be watched. """ # print("registered relay layer {}".format(layer_id)) if self._layer_process is None: # thread = threading.Thread(target=self._watch_relay_layer_buffer, args=(layer_id,), daemon=True) # self._layer_process = thread # thread.start()layer_id, buffer, node_buffer, running process = multiprocessing.Process(target=start_layer_buffer_watch, daemon=True, args=(layer_id, self._layer_buffer, self._relay_layer_buffer, self._processes_running)) self._layer_process = process self._layer_process.start() # print("WATCH RELAY LAYER BUFFER {} {}".format(layer_id, self._layer_process.pid)) # # self._worker_queue.append(process) # print("Register Relay {}".format(process.name)) else: self._logger.warning("Current Link layer already watches a relay layer") def shutdown(self): """Shutdowns the LinkLayer completely. It first shutdowns all Threads and then all subprocesses. This method should be called over the method call pipe. """ # self._logger.debug("Shutdown Linklayer {}".format(self._listen_port)) self._processes_running.value = 0 self.running = False # print("Layer process: {}".format(self._layer_process.pid)) # print("Shutdown LinkLayer") # self._logger.debug("Shutdown relay thread") self._relay_buffer_watch_thread.join() # print("Terminate LinkLayer layer process") self._layer_process.join(1) if self._layer_process.exitcode is None: self._layer_process.kill() # self._logger.debug("Shutdown layer process") # print("Terminate LinkLayer listening process") if self._listening_process is not None: # print("Listening process: {}".format(self._listening_process.pid)) self._listening_process.join(1) # self._logger.debug("EXITCODE OF LISTENING PROCESS: ".format(self._listening_process.exitcode)) if self._listening_process.exitcode is None: self._listening_process.kill() # self._logger.debug("Shutdown listening process") # print("Shutdown link layer") for running in self._relay_running_states.values(): running = False self.stopped = True # print("Done Shutdown") # self._logger.debug("Done shutdown Linklayer {}".format(self._listen_port)) def stop_relay_watch(self, relay_id): """Stops a watch of a relay buffer. The method stops the watch from a relay buffer. This method should be called over the method call pipe. Args: relay_id (RelayId): The RelayId of the Relay which buffer watch should be stopped. """ if relay_id in self._relay_running_states: # print("Stopping relay watch", relay_id, self.__threads[relay_id].pid) self._relay_running_states[relay_id] = False # print(self.__running_states[relay_id].value) self._logger.debug("Process stopped for relay {}".format(relay_id))Methods
def check_key_in_relay_buffer(self, relay_id, check_key)-
Checks the buffer of a given Relay if there is a key in one of the Actions.
First the method checks if the buffer exists for the given relay. If there is a buffer it checks if the buffer is empty. Both conditions return False. After that the message checks all messages in the buffer if there is a key set in one parameter set as a RelayParameter. This method should be called with the method calling pipe.
Args
relay_id:RelayId- Defines the RelayId of the Relay which buffer should be checked.
check_key:str- Defines the key that should be checked.
Returns
bool- True if there is the given key in one RelayParameter in the buffer of the given Relay. False otherwise
Expand source code
def check_key_in_relay_buffer(self, relay_id, check_key): """Checks the buffer of a given Relay if there is a key in one of the Actions. First the method checks if the buffer exists for the given relay. If there is a buffer it checks if the buffer is empty. Both conditions return False. After that the message checks all messages in the buffer if there is a key set in one parameter set as a RelayParameter. This method should be called with the method calling pipe. Args: relay_id (RelayId): Defines the RelayId of the Relay which buffer should be checked. check_key (str): Defines the key that should be checked. Returns: bool: True if there is the given key in one RelayParameter in the buffer of the given Relay. False otherwise """ if relay_id in self._relay_buffer: if len(self._relay_buffer[relay_id]) == 0: return False else: for message in self._relay_buffer[relay_id].copy(): if isinstance(message, Message) \ and message.action.check_has_key_as_parameter(check_key): return True elif isinstance(message, TransmitMessage) \ and message.message.action.check_has_key_as_parameter(check_key): return True elif isinstance(message, Action) and message.check_has_key_as_parameter(check_key): return True return False else: return False def is_buffer_empty(self, relay_id)-
Checks if a buffer for a given relay id defining a relay is empty.
The method checks if there is a buffer for the given relay id and checks if it is empty. This method should be called with the method calling pipe.
Args
relay_id:RelayId- The RelayId that defines the Relay which buffer should be checked.
Returns
bool- True if buffer is not existent or empty. False if the buffer has an entry.
Expand source code
def is_buffer_empty(self, relay_id): """Checks if a buffer for a given relay id defining a relay is empty. The method checks if there is a buffer for the given relay id and checks if it is empty. This method should be called with the method calling pipe. Args: relay_id (RelayId): The RelayId that defines the Relay which buffer should be checked. Returns: bool: True if buffer is not existent or empty. False if the buffer has an entry. """ if relay_id in self._relay_buffer: return len(self._relay_buffer[relay_id]) == 0 else: return True def register_relay_layer(self, layer_id)-
Registers a RelayLayer and creates a buffer watch process.
The LinkLayer creates the buffer watch process for the relay layer. It only creates the process if it is not set yet.
Args
layer_id:RelayLayerId- Defines the RelayLayerId of the RelayLayer which buffer should be watched.
Expand source code
def register_relay_layer(self, layer_id): """Registers a RelayLayer and creates a buffer watch process. The LinkLayer creates the buffer watch process for the relay layer. It only creates the process if it is not set yet. Args: layer_id (RelayLayerId): Defines the RelayLayerId of the RelayLayer which buffer should be watched. """ # print("registered relay layer {}".format(layer_id)) if self._layer_process is None: # thread = threading.Thread(target=self._watch_relay_layer_buffer, args=(layer_id,), daemon=True) # self._layer_process = thread # thread.start()layer_id, buffer, node_buffer, running process = multiprocessing.Process(target=start_layer_buffer_watch, daemon=True, args=(layer_id, self._layer_buffer, self._relay_layer_buffer, self._processes_running)) self._layer_process = process self._layer_process.start() # print("WATCH RELAY LAYER BUFFER {} {}".format(layer_id, self._layer_process.pid)) # # self._worker_queue.append(process) # print("Register Relay {}".format(process.name)) else: self._logger.warning("Current Link layer already watches a relay layer") def shutdown(self)-
Shutdowns the LinkLayer completely.
It first shutdowns all Threads and then all subprocesses. This method should be called over the method call pipe.
Expand source code
def shutdown(self): """Shutdowns the LinkLayer completely. It first shutdowns all Threads and then all subprocesses. This method should be called over the method call pipe. """ # self._logger.debug("Shutdown Linklayer {}".format(self._listen_port)) self._processes_running.value = 0 self.running = False # print("Layer process: {}".format(self._layer_process.pid)) # print("Shutdown LinkLayer") # self._logger.debug("Shutdown relay thread") self._relay_buffer_watch_thread.join() # print("Terminate LinkLayer layer process") self._layer_process.join(1) if self._layer_process.exitcode is None: self._layer_process.kill() # self._logger.debug("Shutdown layer process") # print("Terminate LinkLayer listening process") if self._listening_process is not None: # print("Listening process: {}".format(self._listening_process.pid)) self._listening_process.join(1) # self._logger.debug("EXITCODE OF LISTENING PROCESS: ".format(self._listening_process.exitcode)) if self._listening_process.exitcode is None: self._listening_process.kill() # self._logger.debug("Shutdown listening process") # print("Shutdown link layer") for running in self._relay_running_states.values(): running = False self.stopped = True def stop_relay_watch(self, relay_id)-
Stops a watch of a relay buffer.
The method stops the watch from a relay buffer. This method should be called over the method call pipe.
Args
relay_id:RelayId- The RelayId of the Relay which buffer watch should be stopped.
Expand source code
def stop_relay_watch(self, relay_id): """Stops a watch of a relay buffer. The method stops the watch from a relay buffer. This method should be called over the method call pipe. Args: relay_id (RelayId): The RelayId of the Relay which buffer watch should be stopped. """ if relay_id in self._relay_running_states: # print("Stopping relay watch", relay_id, self.__threads[relay_id].pid) self._relay_running_states[relay_id] = False # print(self.__running_states[relay_id].value) self._logger.debug("Process stopped for relay {}".format(relay_id))