Module RelayModel.RelayLayer
Expand source code
import multiprocessing
import queue
import threading
import time
from collections import deque
from RelayModel import RelayLogging, KeyGeneration, ConceptChange
from RelayModel.Communication import *
from RelayModel.LinkLayer import start_link_layer
from RelayModel.Relay import *
from RelayModel.RelayId import *
class RelayLayer:
"""Represents the implementation of the Relay Layer in the relay model.
The RelayLayer class handles all connections over Relays and tries to stabilize the Relay information by
communicating with other RelayLayers.
It starts a LinkLayer inside a separated process to send messages over Relays. Furthermore it starts three threads.
The first thread sends messages to the LinkLayer. The second Thread is the timeout thread which periodically
executes the timeout method of the RelayLayer. The last thread watches the message buffer.
For further information about the RelayLayer see the thesis of this library.
Attributes:
link_layer_buffer (multiprocessing.Queue): Stores the buffer of the LinkLayer where messages should be inserted.
This messages will get inserted in the right buffer and will be processed from the LinkLayer. The objects
that are inserted are always a tuple of data. The first entry of the tuple is either a Relay object if the
message should be inserted into a relay buffer or a RelayLayerId when the message should be sent to another
RelayLayer.
link_layer_call_pipe (multiprocessing.Pipe): Holds the communication pipe of the LinkLayer. This pipe is used to
call methods inside the LinkLayer process. A method can be called with the internal method
_call_link_layer_message.
message_buffer (multiprocessing.Queue): This buffer holds messages that needs to be processed by this
RelayLayer. It gets filled by the LinkLayer if it receives a message.
link_layer_process (multiprocessing.Process): Holds the LinkLayer process executing the LinkLayer.
RID (RelayLayerId): Holds the id defining this specific relay layer.
relays (dict): Holds a dict of relays. The key is the RelayId of the specific relay and the value the Relay
object.
last_id (int): Stores the last id relay id for creation of a new relay. This is counted up when creating a new
relay.
active (bool): Holds the active state of the RelayLayer. If this is set to False the RelayLayer tries to
shutdown. By default this is set to True.
running (bool): Holds the running state of the RelayLayer. If this is set to False the RelayLayer is stopped
and all threads are shutting down.
calling_lock (threading.Lock): Holds a Lock object to prevent simultaneous access to the link_layer_call_pipe.
logger (logging.Logger): Holds the Logger object of this class.
buffer_put_actions (deque): Defines a buffer where every message that needs to be inserted in a buffer gets
inserted. The buffer_put_thread is sending this messages to the LinkLayer.
buffer_put_thread (threading.Thread): The thread removes every message containing in the buffer_put_actions and
sends it to the LinkLayer with the link_layer_buffer.
message_handling_thread (threading.Thread): Holds the thread that executes every action sent to the RelayLayer.
The messages are inside the message_buffer attribute.
timeout_thread (threading.Thread): Holds the timeout thread where the RelayLayer periodically executes the
timeout method.
relay_windows (dict): Stores the windows of each relay for DoS attack mitigation. The key is the RelayId and the
value is the window for the specific Relay as a deque object.
transmit_count (dict): Stores the transmit count of a specific relay. The key is the RelayId and the value is
the count of transmit messages sent over the specific Relay.
"""
def __init__(self, layer_id, node_queue):
"""By creating a RelayLayer object all attributes gets initializes.
The RelayLayer needs for creation a RelayLayerId to identify this id. Also i needs a buffer from a node where
actions that needs to be sent to the node gets inserted.
Args:
layer_id (RelayLayerId): Identifying the RelayLayer and gets stored in the RID attribute.
node_queue (multiprocessing.Queue): Defining the buffer of the node.
"""
self.link_layer_buffer = multiprocessing.Queue()
self.link_layer_call_pipe, link_layer_pipe = multiprocessing.Pipe(True)
self.message_buffer = multiprocessing.Queue()
self.link_layer_process = multiprocessing.Process(target=start_link_layer, args=(self.link_layer_buffer,
self.message_buffer,
node_queue,
layer_id.ip, layer_id.port,
link_layer_pipe))
self.link_layer_process.start()
self.RID = layer_id
self.relays = {}
self.last_id = -1
self.active = True
self.running = True
self.calling_lock = threading.Lock()
self.timeout_period = ModuleConfig.RELAY_LAYER_TIMEOUT_PERIOD
self.logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL,
"relay_layer_{}".format(str(self.RID).replace(':', '_')))
self.buffer_put_actions = deque()
self.buffer_put_thread = threading.Thread(target=self._start_buffer_put_thread, daemon=True)
self.buffer_put_thread.start()
self.timeout_thread = threading.Thread(target=self._start_timeout_thread, daemon=True)
self.timeout_thread.start()
self.message_handling_thread = threading.Thread(target=self._start_message_handling_thread, daemon=True)
self.message_handling_thread.start()
self._call_link_layer_message("register_relay_layer", [layer_id])
self.relay_windows = {}
self.transmit_count = {}
def _start_message_handling_thread(self):
self.logger.debug("Start message handling thread")
while self.running:
try:
message = self.message_buffer.get(False, 0.1)
self.logger.debug("Message buffer size: {}".format(self.message_buffer.qsize()))
if isinstance(message, TransmitMessage):
transmit_message = message.message
self.handle_transmit(transmit_message)
elif isinstance(message, LayerMessage):
action = message.action
# determine action
if isinstance(action, ProbeFailAction):
self.handle_probe_fail(action.key, action.key_sequence)
elif isinstance(action, PingAction):
for ping in action.pings:
self.handle_ping(ping.relay_id, ping.level, ping.sink_rid, ping.key)
elif isinstance(action, InRelayClosedAction):
self.handle_in_relay_closed(action.keys, action.sender_rid, action.relay_id)
elif isinstance(action, OutRelayClosedAction):
self.handle_out_relay_closed(action.relay_id)
elif isinstance(action, NotAuthorizedAction):
self.handle_not_authorized(action.keys, action.out_id)
else:
self.logger.error("No supported class found for message {}".format(message))
except queue.Empty:
pass
self.logger.debug("Stopped Relay Layer message handling Thread")
def _call_link_layer_message(self, function, args, needs_response=False):
if self.link_layer_call_pipe is not None:
with self.calling_lock:
call = [function, needs_response, args]
try:
self.link_layer_call_pipe.send(call)
except Exception as e:
print("EXCEPTION in send message {} {}".format(call, e))
if needs_response:
try:
response = self.link_layer_call_pipe.poll(0.1)
except EOFError:
print("EOFERROR!!!!")
self.link_layer_call_pipe.close()
self.link_layer_call_pipe = None
except Exception as e:
print("EXCEPTION in receive {}".format(e))
else:
return response
def get_key_for_layer(self):
"""Gets a authentication key generated by the RelayLayer.
Returns:
str: A key used for authenticating a connection.
"""
return KeyGeneration.generate_key(self.RID)
def is_sink(self, relay_id: RelayId):
"""Checks a Relay if it is a sink relay.
Args:
relay_id (RelayId): The RelayId representing the Relay that should be checked.
Returns:
bool: True if the specific relay is a sink relay, False otherwise.
"""
if relay_id in self.relays:
check_relay = self.relays[relay_id]
return check_relay.is_sink()
def has_incoming(self, relay_id: RelayId):
"""Returns the amount of incoming connections of a given relay.
Args:
relay_id (RelayId): The RelayId representing the Relay that should be checked.
Returns:
int: The amount of incoming connections.
"""
if relay_id in self.relays:
check_relay = self.relays[relay_id]
return check_relay.has_incoming()
def is_direct(self, relay_id: RelayId):
"""Checks a Relay if it is a direct relay.
Args:
relay_id (RelayId): The RelayId representing the Relay that should be checked.
Returns:
bool: True if the specific relay is a direct relay, False otherwise.
"""
if relay_id in self.relays:
check_relay = self.relays[relay_id]
return check_relay.is_direct()
def same_target(self, relay_id_1: RelayId, relay_id_2: RelayId):
"""Checks if two relays have the same outgoing connection.
Args:
relay_id_1 (RelayId): The first relay that should be checked.
relay_id_2 (RelayId): The second relay that should be checked.
Returns:
bool: True if the specific relays have the same outgoing connection, False otherwise.
"""
if relay_id_1 in self.relays and relay_id_2 in self.relays:
check_relay1 = self.relays[relay_id_1]
check_relay2 = self.relays[relay_id_2]
return check_relay1.same_target(check_relay2)
return False
def is_dead(self, relay_id: RelayId):
"""Checks a Relay if it is dead.
Args:
relay_id (RelayId): The RelayId representing the Relay that should be checked.
Returns:
bool: True if the specific relay is dead, False otherwise.
"""
if relay_id in self.relays:
check_relay = self.relays[relay_id]
return check_relay.is_dead()
return False
def get_level(self, relay_id: RelayId):
"""Gets the connection level of a relay.
Args:
relay_id (RelayId): The RelayId representing the Relay that should be checked.
Returns:
int: The level of the connection.
"""
if relay_id in self.relays:
check_relay = self.relays[relay_id]
return check_relay.level
return None
def get_sink_node_id(self, relay_id: RelayId):
"""Gets the node id of the node holding the sink relay of the connection defined by the given relay.
Args:
relay_id (RelayId): The RelayId representing the Relay that should be checked.
Returns:
int: The node id of the sink relay.
"""
if relay_id in self.relays:
check_relay = self.relays[relay_id]
return check_relay.sink_rid.node_id
return None
def _get_relay(self, relay_id: RelayId):
if relay_id in self.relays:
get_relay = self.relays[relay_id]
return get_relay
return None
def _start_buffer_put_thread(self):
while self.running:
if len(self.buffer_put_actions) > 0:
self.logger.debug("Put actions buffer len: {}".format(len(self.buffer_put_actions)))
message = self.buffer_put_actions.popleft()
buffer = message[0]
action = message[1]
self.logger.debug(action)
try:
if isinstance(buffer, RelayLayerId):
if buffer == self.RID:
self.link_layer_buffer.put_nowait(message)
elif isinstance(buffer, Relay):
if buffer.relay_id in self.relays:
send_message = [buffer, action]
self.link_layer_buffer.put_nowait(send_message)
else:
self.logger.warning("Not right type in put thread: {}".format(message))
except BrokenPipeError:
self.logger.warning("BrokenPipeErrror")
# self.logger.warning("Stopped Buffer put Thread")
def _start_timeout_thread(self):
# self.logger.debug("Start timeout thread")
while self.running:
self.timeout()
time.sleep(self.timeout_period)
# self.logger.warning("Stopped Timeout Thread")
def new_relay(self):
"""Gets a new relay reference.
The RelayLayer creates a new Relay object and provides the RelayId of it.
Returns:
RelayId: of the new Relay object.
"""
self.last_id += 1
relay_id = RelayId(self.RID, self.last_id)
new_relay = Relay(relay_id)
self.add_relay_to_layer(new_relay)
return new_relay.relay_id
def get_new_relay_object(self):
"""Gets a new relay object.
The method creates a new Relay in the RelayLayer but instead of adding it to the RelayLayer it just returns the
object of it. This is needed to create connections between Nodes. The Relay can later be added to the RelayLayer
by executing the add_to_layer method.
Returns:
Relay: A new Relay object.
"""
self.last_id += 1
relay_id = RelayId(self.RID, self.last_id)
new_relay = Relay(relay_id)
return new_relay
def get_relay_by_relay_id(self, relay_id):
"""Gets the Relay object of defined by the given RelayId.
This should only be used for analysing purposes. Normally one should not have access to the relay object from
outside.
Args:
relay_id (RelayId): The RelayId of the Relay that should be returned.
Returns:
Relay: The Relay object defined by the given RelayId or None if the Relay is not existent in the RelayLayer.
"""
if relay_id in self.relays:
return self.relays[relay_id]
else:
return None
def add_relay_to_layer(self, new_relay: Relay):
"""Adds a Relay object to the RelayLayer.
The method only accepts Relay object and only adds it to the layer if the relay id is not present in the layer.
Args:
new_relay (Relay): The Relay object that should be added to the layer.
"""
if isinstance(new_relay, Relay) and new_relay.relay_id not in self.relays:
self.relays[new_relay.relay_id] = new_relay
def timeout(self):
"""The timeout method of the RelayLayer.
For further information see the thesis of the library or the phd thesis of Setzer.
"""
self.logger.debug("Relay length stored: {}".format(len(self.relays)))
self.logger.debug(f"Relays: [{RelayLogging.relay_list_to_string(self.relays.values())}]")
for relay in self.relays.copy().values():
relay_start = time.time()
start_time = time.time_ns()
self.logger.debug(f"Timeout by {self.RID} for {relay}")
relay_id = relay.relay_id
# if relay_id not in self.relay_buffers or relay_id not in self.relays:
# continue
self.monitor_timeout_transmit_rate(relay)
if ModuleConfig.DOS_DETECTION_ACTIVATED:
dos_detected = self.check_dos_attack(relay)
if dos_detected:
self.logger.error("Denial of Service attack detected in relay {}".format(relay))
self.delete(relay.relay_id)
continue
# relay_buffer = self.relay_buffers[relay_id]
if relay_id.layer_id != self.RID:
self._timeout_delete_subroutine(relay)
continue
# self.logger.debug(f"Benchmark timeout 1: {time.time_ns() - start_time}")
start_time = time.time_ns()
# correct level of relay
if relay.out_relay.out_id is None:
relay.level = 0
elif relay.level < 1:
relay.level = 1
# has out id or out id is a relay layer id
out_ref = relay.out_relay
if not isinstance(out_ref, OutReference) or out_ref.out_id is not None \
and not isinstance(out_ref.out_id.layer_id, RelayLayerId) \
and not isinstance(out_ref.out_id, RelayId):
self._timeout_delete_subroutine(relay)
continue
# self.logger.debug(f"Benchmark timeout 2: {time.time_ns() - start_time}")
start_time = time.time_ns()
if out_ref.out_id is None and len(out_ref.keys) > 0:
out_ref.keys = set()
# self.logger.debug("Reset the keys because out id is not set")
if out_ref.out_id is not None and len(out_ref.keys) == 0:
self._timeout_delete_subroutine(relay)
# self.logger.debug("Delete relay for not having key")
continue
# # self.logger.debug(f"Benchmark timeout 3: {time.time_ns() - start_time}")
start_time = time.time_ns()
remove_refs = []
# correct in references
for in_ref in relay.in_relays:
in_ref_key = in_ref.key
needs_deletion = not KeyGeneration.check_key_origin(in_ref_key, self.RID)
# check same in references for same key and different rids and relays
if not needs_deletion:
for check_in_ref in relay.in_relays:
if in_ref_key == check_in_ref.key and in_ref.rid != check_in_ref.rid \
and in_ref.relay != check_in_ref.relay:
needs_deletion = True
break
# check other relays for the key
if not needs_deletion:
for check_relay in self.relays.copy().values():
if check_relay != relay and check_relay.has_key_in_in_ref(in_ref_key):
needs_deletion = True
break
if needs_deletion:
print("NEEDS DELETION!!")
remove_refs += [in_ref]
for in_ref in remove_refs:
relay.remove_in_reference(in_ref)
# self.logger.debug(f"Benchmark timeout 4: {time.time_ns() - start_time}")
start_time = time.time_ns()
# ping all relay layers that has an in reference
pings = {}
for in_ref in relay.in_relays:
if in_ref.key != "" and in_ref.rid is not None and in_ref.relay is None:
action = Ping(relay.relay_id, relay.level, relay.sink_rid, in_ref.key)
if in_ref.rid in pings:
pings[in_ref.rid].append(action)
else:
pings[in_ref.rid] = [action]
for rid, ping_list in pings.items():
ping_container = PingAction(ping_list)
layer_message = LayerMessage(rid, ping_container)
self.buffer_put_actions.append((self.RID, layer_message))
# self.buffer.put_nowait(layer_message)
remove_refs = []
for in_ref in relay.in_relays:
if not isinstance(in_ref, InReference) and not in_ref.check_valid():
remove_refs += [in_ref]
for in_ref in remove_refs:
relay.remove_in_reference(in_ref)
# self.logger.debug(f"Benchmark timeout 5: {time.time_ns() - start_time}")
start_time = time.time_ns()
if not relay.alive:
relay.clear_in_relays()
if relay.out_relay.out_id is None:
self._remove_relay_completely(relay)
# self.logger.debug(f"Remove completely in not alive for relay {relay.relay_id}")
continue
else:
if not self._has_relay_for_in_relay(relay):
action = InRelayClosedAction(relay.out_relay.keys, self.RID, relay.out_relay.out_id)
layer_message = LayerMessage(relay.out_relay.out_id.layer_id, action)
# self.buffer.put_nowait(layer_message)
self.buffer_put_actions.append((self.RID, layer_message))
self._remove_relay_completely(relay)
# self.logger.debug(f"Remove completely in not alive and out id is not none for "
# f"relay {relay.relay_id}")
continue
else:
self.logger.debug("Cannot remove relay because has relay in relay")
# self.logger.debug(f"Benchmark timeout 6: {time.time_ns() - start_time}")
start_time = time.time_ns()
if not self.active and (out_ref.out_id is None or len(relay.in_relays) == 0
and not self._has_relay_for_in_relay(relay)):
if self._call_link_layer_message("is_buffer_empty", [relay.relay_id], True):
self._remove_relay_completely(relay)
# self.logger.debug("Remove completely in not active and no messages")
continue
elif not self.active:
self.logger.debug("Cannot remove relay {}".format(relay))
# self.logger.debug(f"Benchmark timeout 7: {time.time_ns() - start_time}")
start_time = time.time_ns()
for check_relay in self.relays.copy().values():
if not check_relay.validated:
continue
check_out_keys = check_relay.out_relay.keys
relay_out_keys = out_ref.keys
intersect = relay_out_keys.intersection(check_out_keys)
if check_relay != relay and len(intersect) > 0:
if check_relay.relay_id.relay_id > relay.relay_id.relay_id:
# self.logger.debug("out keys intersect: {}{}{}".format(check_relay, "\n", relay))
self._timeout_delete_subroutine(relay)
# self.logger.debug("Completely remove relay in out key compare")
continue
key_sequence = [out_ref.keys]
control_keys = []
# self.logger.debug(f"Benchmark timeout 7: {time.time_ns() - start_time}")
start_time = time.time_ns()
# buffer_copy = relay.buffer.copy()
for check_relay in self.relays.copy().values():
for check_in_ref in check_relay.in_relays:
if check_in_ref.rid is None and check_in_ref.relay == relay and check_in_ref.key != "":
check_key = check_in_ref.key
is_in_buffer = self._call_link_layer_message("check_key_in_relay_buffer", [check_relay.relay_id,
check_key], True)
if not is_in_buffer:
control_keys.append(check_key)
# self.logger.debug(f"Benchmark timeout 8: {time.time_ns() - start_time}")
start_time = time.time_ns()
if (self.active or len(relay.in_relays) > 0 or len(control_keys) > 0) and out_ref.out_id is not None:
# self.logger.debug(f"Control keys for timeout probing: {control_keys}")
header = Header(out_ref.keys, relay_id.layer_id, out_ref.out_id)
action = ProbeAction(control_keys, key_sequence)
message = Message(header, action)
transmit_message = TransmitMessage(message)
start_time = time.time_ns()
# relay_buffer.put_nowait(transmit_message)
self.buffer_put_actions.append((relay, transmit_message))
# self.logger.debug(f"Benchmark timeout buffer append: {time.time_ns() - start_time}")
# self.logger.debug(f"Benchmark timeout 9: {time.time_ns() - start_time}")
start_time = time.time_ns()
# self.logger.debug(f"Benchmark timeout end relay: {time.time() - relay_start}")
if not self.active and len(self.relays.values()) == 0:
self.shutdown()
def _timeout_delete_subroutine(self, relay: Relay):
self.delete(relay.relay_id)
for diff_relay in self.relays.copy().values():
diff_relay.remove_in_reference_by_relay(relay)
self._remove_relay_completely(relay)
def delete(self, relay_id: RelayId):
"""Deletes a relay.
For further information see the thesis of the library or the phd thesis of Setzer.
Args:
relay_id (RelayId): The RelayId of the Relay that should be deleted.
"""
self.logger.debug("Delete call for relay {} \n Stack: ".format(relay_id)) # , traceback.format_stack()))
if relay_id in self.relays:
delete_relay = self.relays[relay_id]
delete_relay.alive = False
send_rids = []
for in_ref in delete_relay.in_relays:
if in_ref.key is not None and in_ref.rid is not None and in_ref.relay is None \
and in_ref.rid not in send_rids:
action = OutRelayClosedAction(delete_relay.relay_id)
layer_message = LayerMessage(in_ref.rid, action)
# self.buffer.put_nowait(layer_message)
self.buffer_put_actions.append((self.RID, layer_message))
send_rids.append(in_ref.rid)
delete_relay.clear_in_relays()
if delete_relay.is_sink():
self._remove_relay_completely(delete_relay)
def merge(self, relays):
"""Merge method of the RelayLayer.
For further information see the thesis of the library or the phd thesis of Setzer.
Args:
relays (list): A list of relay ids that should be merged to one relay.
Returns:
RelayId or None: The RelayId of the merged Relay or None if merge was not successful.
"""
start_time = time.time()
if len(relays) > 1:
# self.logger.debug(f"Merge 1: {relays[0]}")
# self.logger.debug(f"Merge 2: {relays[1]}")
for index, relay_id in enumerate(relays):
if relay_id not in self.relays:
self.logger.warning("Merge failed relay {} not in relays".format(relay_id))
return
else:
relays[index] = self.relays[relay_id]
merge_relay = relays[0]
common_out_id = merge_relay.out_relay.out_id
common_level = merge_relay.level
common_alive = merge_relay.alive
common_sink_rid = merge_relay.sink_rid
if not common_alive:
self.logger.warning(f"Merge failed because not alive common")
return
relay_list_diff = [element.out_relay.keys for element in self.relays.copy().values() if
element not in relays]
out_keys = set()
for relay in relays:
out_reference = relay.out_relay
if out_reference.out_id != common_out_id or relay.level != common_level or relay.alive != common_alive \
or relay.sink_rid != common_sink_rid or len(relay.in_relays) > 0:
self.logger.warning("Merge failed because first check {} {}".format(merge_relay, relay))
return
for key in out_reference.keys:
out_keys.add(key)
for diff_keys in relay_list_diff:
if key in diff_keys:
self.logger.warning(f"Merge failed because key in diff keys: {diff_keys}")
return
merge_relay.validated = False
merge_relay.out_relay.keys = out_keys
merge_relay.out_relay.out_id = common_out_id
merge_relay.level = common_level
merge_relay.sink_rid = common_sink_rid
# merge_relay_buffer = self.relay_buffers[merge_relay.relay_id]
# merge in references
for relay in relays:
if relay == merge_relay:
continue
for diff_relay in self.relays.copy().values():
if diff_relay in relays:
continue
diff_relay.replace_relay_in_references(relay, merge_relay)
self._remove_relay_completely(relay)
# while True:
# try:
# message = relay_buffer.get(False)
# merge_relay_buffer.put_nowait(message)
# except queue.Empty:
# break
# merge_relay.buffer.extend(buffer_list)
# self.add_relay_to_layer(merge_relay)
self.relays[merge_relay.relay_id] = merge_relay
# self.logger.debug(f"Merged into new relay: {merge_relay}")
return merge_relay.relay_id
def get_relays(self):
"""Gets all RelayIds of the Relays present in the RelayLayer.
Returns:
list: A list of RelayIds present in the RelayLayer.
"""
return [relay.relay_id for relay in self.relays.copy().values() if relay.alive]
def get_validated_relays(self):
"""Gets all RelayIds of the validated Relays present in the RelayLayer.
Returns:
list: A list of RelayIds which Relay is validated present in the RelayLayer.
"""
return [relay.relay_id for relay in self.relays.copy().values() if relay.alive and relay.validated]
def check_relay_exists(self, relay_id: RelayId):
"""Checks if a Relay is existing in the RelayLayer.
Args:
relay_id (RelayId): The RelayId representing the Relay shat should be checked.
Returns:
bool: True if Relay exists in RelayLayer, False otherwise.
"""
return relay_id in self.relays
def validate_relay(self, relay_id: RelayId):
"""Validates a given Relay.
The method validates the Relay by setting its validated flag to True.
Args:
relay_id (RelayId): The RelayId of the Relay that should be validated.
"""
if relay_id in self.relays:
this_relay = self.relays[relay_id]
this_relay.validated = True
def stop(self):
"""Stops the Relay Layer.
The method stops by deleting all relays and setting the active state to False.
"""
for relay in self.relays.copy().values():
self.delete(relay.relay_id)
self.active = False
def shutdown(self):
"""Shutdown the RelayLayer completely without removing Relay connections.
"""
# self.logger.warning("Relay layer should be shutdown")
self.running = False
# self.timeout_thread.join()
# self.logger.warning("Joined TimeoutThread")
self.relays.clear()
self.buffer_put_thread.join()
# self.logger.debug("Call Shutdown")
self._call_link_layer_message("shutdown", [])
# self.logger.debug("After Called Shutdown")
# self.logger.warning("Called shutdown")
# print("Link Layer Process {}".format(self.link_layer_process.pid))
# self.logger.warning("Pre Join")
self.link_layer_process.join(3)
if self.link_layer_process.exitcode is None:
self.link_layer_process.kill()
# self.logger.warning("After join")
# self.logger.warning("Done shutdown relay layer {}".format(self.RID))
def send(self, send_relay_id: RelayId, action: Action):
"""Send a action over a specific relay.
For further information see the thesis of this library.
Args:
send_relay_id (RelayId): The RelayId representing the Relay which should transmit the action.
action (Action): The action that should be transmitted.
"""
if send_relay_id not in self.relays:
return False
# self.logger.warning(
# f"Tried send action {action}\n Layer has no relay for the given relay {send_relay}\n")
# self.logger.warning(f"\n Stack: {traceback.format_stack()}")
# if relay_id.layer_id == self.RID:
# self.logger.debug(f"Add relay to layer")
# self.add_relay_to_layer(send_relay)
# relay = send_relay
# else:
# self.logger.warning(f"Cannot add relay to layer in send with {send_relay}")
# return False
else:
relay = self.relays[send_relay_id]
# self.logger.debug(f"Send to {relay.relay_id}action {action}")
if not self.running:
self.logger.warning("Cannot send relay layer not running anymore")
if isinstance(action, Action):
if relay.alive:
if not relay.validated:
relay.validated = True
# Relay is sink relay and should send the action to node
if relay.out_relay.out_id is None:
# relay_buffer.put_nowait(action)
self.buffer_put_actions.append((relay, action))
else:
for index, parameter in enumerate(action.parameters):
if isinstance(parameter, RelayId):
if parameter in self.relays:
parameter_relay = self.relays[parameter]
key = KeyGeneration.generate_key(self.RID)
new_in_ref = InReference(key, None, relay)
parameter_relay.add_in_reference(new_in_ref)
relay_parameter = RelayParameter(key, parameter_relay.relay_id,
parameter_relay.level + 1,
parameter_relay.sink_rid)
action.parameters[index] = relay_parameter
else:
# self.logger.warning(f"No relay for parameter id {parameter}")
print("ERRROR2")
header = Header(relay.out_relay.keys, relay.relay_id.layer_id, relay.out_relay.out_id)
message = Message(header, action)
transmit_message = TransmitMessage(message)
self.logger.debug(f"Send call with {transmit_message}\n")
# relay_buffer.put_nowait(transmit_message)
self.buffer_put_actions.append((relay, transmit_message))
return True
else:
print("RELAY {}, {} NOT ALIVE!!".format(relay.relay_id, relay.sink_rid))
self.logger.warning("Cannot send on not alive relay {}".format(relay.relay_id))
def handle_transmit(self, message):
"""The method is called when a transmit message is received by the RelayLayer.
For further information see the thesis of this library.
Args:
message (Message): The message that got transmitted.
"""
start_time = time.time()
if isinstance(message, Message):
header = message.header
authorized, relay = self._get_active_relay_for_header(header)
# self.logger.debug(f"Benchmark transmit 1: {time.time() - start_time}")
start_time = time.time()
self.logger.debug(f"Transmit message with header {header}\n and message {message}")
if relay and authorized:
self.monitor_transmit(message, relay)
# self.logger.debug(f"Transmit message with header {header} for relay {relay}")
header_keys = header.keys
header_sender_rid = header.sender_rid
for key in header_keys:
for in_ref in relay.in_relays:
if in_ref.key == key and in_ref.rid is None and in_ref.relay is not None \
and in_ref.relay.sink_rid == header_sender_rid:
in_ref.relay = None
in_ref.rid = header_sender_rid
# self.logger.debug(f"Benchmark transmit activate connections: {time.time() - start_time}")
start_time = time.time()
valid_relay_keys = relay.get_valid_keys(header_sender_rid)
false_keys = [key for key in header_keys if key not in valid_relay_keys]
# self.logger.debug(f"Benchmark transmit get false keys: {time.time() - start_time}")
start_time = time.time()
# some keys are not valid and should be corrected
if len(false_keys) > 0:
self.logger.debug("Some keys were invalid: {}\n".format(', '.join(map(str, false_keys))))
# self.logger.debug("Valid keys: {}".format(valid_relay_keys))
self.logger.debug("Send not Authorized in false keys")
action = NotAuthorizedAction(false_keys, header.out_id)
layer_message = LayerMessage(header_sender_rid, action)
# self.buffer.put_nowait(layer_message)
self.buffer_put_actions.append((self.RID, layer_message))
# self.logger.debug(f"Benchmark transmit 2: {time.time() - start_time}")
start_time = time.time()
if relay.out_relay.out_id is None:
# relay is a sink relay and has no out reference
# if action is a probe action
if isinstance(message.action, ProbeAction):
# self.logger.debug(f"Got Probe action: {message.action}")
# self.logger.debug(f"For Relay: {relay}")
control_keys = message.action.control_keys
key_sequence = message.action.key_sequence
for control_key in control_keys:
if not self._check_if_relay_has_out_key(control_key):
# self.logger.debug(
# f"Benchmark transmit append probe after check: {time.time() - start_time}")
start_time = time.time()
key_k = key_sequence[-1]
# check if probe fails
rids = []
for in_ref in relay.in_relays:
if in_ref.key in key_k and in_ref.relay is None and in_ref.rid is not None:
rids += [in_ref.rid]
if len(rids) > 0:
# self.logger.debug("Probing failed")
# print("Probing Failed!")
# if len(rids) > 1:
# print(f"More than one rid found: {', '.join(map(str, rids))}")
send_rid = rids[0]
action = ProbeFailAction(control_key, key_sequence)
layer_message = LayerMessage(send_rid, action)
# self.buffer.put_nowait(layer_message)
self.buffer_put_actions.append((self.RID, layer_message))
else:
# self.logger.debug(
# f"Benchmark transmit append probe after check: {time.time() - start_time}")
start_time = time.time()
# self.logger.debug(
# f"Benchmark transmit append probe for one key: {time.time() - start_time}")
start_time = time.time()
elif self._check_all_parameter_ids_belong_to_same_rid(message):
# self.logger.debug(f"Forward message: {message}")
action = message.action
replace_parameter_list = []
for index, parameter in enumerate(action.parameters):
if isinstance(parameter, RelayParameter):
parameter_key = parameter.key
ref_relay = self._get_relay_with_out_key(parameter_key)
# self.logger.debug(f"Benchmark transmit replace get relay: {time.time() - start_time}")
start_time = time.time()
# print(self.RID, parameter, action.action_type)
if parameter.level > 0 and ref_relay is None:
self.last_id += 1
new_relay = Relay(RelayId(self.RID, self.last_id))
new_relay.out_relay.keys.add(parameter_key)
new_relay.out_relay.out_id = parameter.relay_id
new_relay.level = parameter.level
new_relay.sink_rid = parameter.rid
new_relay.validated = False
probe_action = ProbeAction([], [parameter_key])
new_header = Header({parameter_key},
new_relay.relay_id.layer_id, parameter.relay_id)
new_message = Message(new_header, probe_action)
transmit_message = TransmitMessage(new_message)
# self.logger.debug(
# f"Benchmark transmit replace before replacement: {time.time() - start_time}")
start_time = time.time()
# new_relay_buffer.put_nowait(transmit_message)
self.buffer_put_actions.append((new_relay, transmit_message))
# self.logger.debug(
# f"Benchmark transmit replace after append: {time.time() - start_time}")
start_time = time.time()
self.add_relay_to_layer(new_relay)
# self.logger.debug(
# f"Benchmark transmit replace append and add: {time.time() - start_time}")
replace_parameter_list += [(index, new_relay.relay_id)]
# self.logger.debug(
# f"Replacing to relay {new_relay.relay_id} for parameters in {action}:"
# f" {parameter}")
else:
replace_parameter_list += [(index, None)]
# self.logger.debug(
# f"Benchmark transmit replace after replacement: {time.time() - start_time}")
start_time = time.time()
for index, replace_element in replace_parameter_list:
action.parameters[index] = replace_element
# relay_buffer.put_nowait(action)
self.buffer_put_actions.append((relay, action))
else:
self.logger.error("Message is corrupted: {}\n".format(message))
# self.logger.debug(f"Benchmark transmit append: {time.time() - start_time}")
start_time = time.time()
else:
# message gets forwarded to next relay
action = message.action
if isinstance(action, ProbeAction):
action.key_sequence.append(relay.out_relay.keys)
control_keys = action.control_keys
for control_key in control_keys.copy():
if self._call_link_layer_message("check_key_in_relay_buffer", [relay.relay_id,
control_key], True):
control_keys.remove(control_key)
send_header = Header(relay.out_relay.keys, relay.relay_id.layer_id, relay.out_relay.out_id)
send_message = Message(send_header, action)
transmit_message = TransmitMessage(send_message)
# relay_buffer.put_nowait(transmit_message)
self.buffer_put_actions.append((relay, transmit_message))
# self.logger.debug(f"Benchmark transmit forward: {time.time() - start_time}")
start_time = time.time()
elif relay and not authorized:
self.logger.debug(f"Not authorized for header {header} and relay {relay}")
# print(self.RID, f"Send not authorized in tramsit for header {header} and relay {relay}")
action = NotAuthorizedAction(header.keys, header.out_id)
layer_message = LayerMessage(header.sender_rid, action)
# self.buffer.put_nowait(layer_message)
self.buffer_put_actions.append((self.RID, layer_message))
elif header.out_id is not None and header.out_id.layer_id == self.RID:
action = OutRelayClosedAction(header.out_id)
layer_message = LayerMessage(header.sender_rid, action)
# self.buffer.put_nowait(layer_message)
self.buffer_put_actions.append((self.RID, layer_message))
else:
self.logger.warning("No Relay found for header {}\n".format(header))
def handle_not_authorized(self, keys, out_id):
"""The method is called when a NotAuthorized message is received by the RelayLayer.
For further information see the thesis of this library.
Args:
keys (list): List of keys.
out_id (RelayId): The RelayId that is not authorized.
"""
self.logger.debug(f"Handle not authorized executed with: {keys}, {out_id}")
for key in keys.copy():
if out_id is not None:
relay = self._get_relay_by_out_id_and_out_key(out_id, key)
if relay is not None:
out_ref = relay.out_relay
out_ref.remove_key(key)
if len(out_ref.keys) == 0:
# remove all pending relays sent via r
for check_relay in self.relays.copy().values():
check_relay.remove_in_reference_by_relay(relay)
self.delete(relay.relay_id)
self._remove_relay_completely(relay)
else:
self.logger.warning(
"NOT_AUTHORIZED: No relay found with out id {} and key {}\n".format(out_id, key))
else:
# Message corrupted
self.logger.error("Out id is empty in handle not authorized\n")
def handle_probe_fail(self, key, key_sequence):
"""The method is called when a ProbeFail message is received by the RelayLayer.
For further information see the thesis of this library.
Args:
key (str): Key that failed the Probe message.
key_sequence (list): List of key lists that represent the route the Probe message went.
"""
self.logger.debug(f"Handle probe_fail executed with: {key}, {key_sequence}")
# get relays where some keys from key_sequence[k] are in relay out keys
k = len(key_sequence)
if k == 0:
# no key sequence given
self.logger.error("PROBE_FAIL: Key sequence empty\n")
return
relay = self._get_relay_by_out_key_intersection(key_sequence[k - 1])
if relay is not None:
if k > 1:
# call needs to be passed on
compare_keys = key_sequence[k - 2]
new_key_sequence = key_sequence[:-1]
for in_ref in relay.in_relays:
if in_ref.relay is None and in_ref.rid is not None and in_ref.key in compare_keys:
action = ProbeFailAction(key, new_key_sequence)
layer_message = LayerMessage(in_ref.rid, action)
# self.buffer.put_nowait(layer_message)
self.buffer_put_actions.append((self.RID, layer_message))
break
else:
for diff_relay in self.relays.copy().values():
diff_relay.remove_in_reference_by_relay_and_key(key, relay)
def handle_in_relay_closed(self, keys: list, sender_rid: RelayLayerId, relay_id: RelayId):
"""The method is called when a InRelayClosed message is received by the RelayLayer.
For further information see the thesis of this library.
Args:
keys (list): The list of keys that are used in the closed connection.
sender_rid (RelayLayerId): The RelayLayerId of the RelayLayer that sent the message.
relay_id (RelayId): The relay id that is closed.
"""
self.logger.debug(f"Handle in_relay_closed executed with: {keys}, {sender_rid}, {relay_id}")
if relay_id in self.relays:
relay = self.relays[relay_id]
for key in keys:
relay.remove_in_reference_by_key_and_rid(key, sender_rid)
def handle_ping(self, relay_id: RelayId, level: int, sink_rid: RelayLayerId, key: str):
"""The method is called when a Ping message is received by the RelayLayer.
For further information see the thesis of this library.
Args:
relay_id (RelayId): The relay id there should be a connection to.
level (list): The level of the connection.
sink_rid (RelayLayerId): The sink rid of the connection.
key (str): The key of the connection
"""
self.logger.debug(f"Handle ping executed with: {relay_id}, {level}, {sink_rid}, {key}")
if relay_id is not None:
relay = self._get_relay_by_id_and_out_key(relay_id, key)
if relay is not None:
relay.sink_rid = sink_rid
if relay.level > level + 1:
relay.level = level + 1
if relay.level < level + 1:
# self.logger.debug(f"PING failed and relay closed {relay}\n with parameter: {relay_id},"
# f" {level}, {sink_rid}, {key}\n")
self.delete(relay.relay_id)
for diff_relay in self.relays.copy().values():
diff_relay.remove_in_reference_by_relay(relay)
self._remove_relay_completely(relay)
else:
self.logger.debug(f"NO RELAY FOR PING -> In Relay closed\n with parameter: {relay_id},"
f" {level}, {sink_rid}, {key}\n")
action = InRelayClosedAction([key], self.RID, relay_id)
layer_message = LayerMessage(relay_id.layer_id, action)
# self.buffer.put_nowait(layer_message)
self.buffer_put_actions.append((self.RID, layer_message))
else:
print("No pid given")
def handle_out_relay_closed(self, relay_id: RelayId):
"""The method is called when a OutRelayClosed message is received by the RelayLayer.
For further information see the thesis of this library.
Args:
relay_id (RelayId): The RelayId of the Relay that is closed.
"""
self.logger.debug(f"Handle out_relay_closed executed with: {relay_id}")
if isinstance(relay_id, RelayId):
closed_relay = self._get_relay_by_out_id(relay_id)
if closed_relay is not None:
# remove all "pending" relays sent via r
for relay in self.relays.copy().values():
relay.remove_in_reference_by_relay(closed_relay)
self.delete(closed_relay.relay_id)
self._remove_relay_completely(closed_relay)
else:
self.logger.warning("OUT_RELAY_CLOSED: No relay with relay id {} found".format(relay_id))
else:
self.logger.error(
"Relay id {} provided in the wrong format. Should be an object of RelayId".format(relay_id))
def _has_relay_for_in_relay(self, relay):
for check_relay in self.relays.copy().values():
for in_ref in check_relay.in_relays:
if in_ref.relay == relay:
# self.logger.debug(f"Cannot send in relay closed because {relay.relay_id} is in {check_relay}")
return True
return False
def _remove_relay_completely(self, relay):
self.logger.debug("Call for remove relay completely for relay {} \n Stack: "
.format(relay.relay_id)) # , traceback.format_stack()))
if relay.relay_id in self.relays:
self.relays.pop(relay.relay_id)
# self.relay_buffers.pop(relay.relay_id)
if relay.relay_id in self.relay_windows:
self.relay_windows.pop(relay.relay_id)
if relay.relay_id in self.transmit_count:
self.transmit_count.pop(relay.relay_id)
self._call_link_layer_message("stop_relay_watch", [relay.relay_id])
def _get_active_relay_for_header(self, header):
if not isinstance(header, Header):
self.logger.error("No header provided to get relay for specific header\n")
return False, None
out_id = header.out_id
keys = header.keys
sender_rid = header.sender_rid
# # self.logger.debug(f"Get active relay for header with header: {header}")
if out_id in self.relays:
relay = self.relays[out_id]
if relay.alive:
for in_ref in relay.in_relays.copy():
# self.logger.debug(f"In reference: {in_ref}")
in_ref_key = in_ref.key
in_ref_rid = in_ref.rid
in_ref_relay = in_ref.relay
# # self.logger.debug(f"In ref relay: {in_ref_relay}")
if in_ref_key in keys:
if in_ref_rid is not None and in_ref_relay is None and in_ref_rid == sender_rid:
return True, relay
elif in_ref_rid is None and in_ref_relay is not None \
and in_ref_relay.relay_id.layer_id == self.RID \
and in_ref_relay.sink_rid == sender_rid:
return True, relay
# relay with id found but is not authorized
return False, relay
# no relay found
return False, None
def _get_relay_by_out_id(self, relay_id):
for relay in self.relays.copy().values():
if relay.out_relay.out_id == relay_id:
return relay
return None
def _get_relay_by_id_and_out_key(self, relay_id: RelayId, key):
for relay in self.relays.copy().values():
if relay.out_relay.out_id == relay_id and key in relay.out_relay.keys:
return relay
return None
def _get_relay_by_out_id_and_out_key(self, out_id: RelayId, key):
for relay in self.relays.copy().values():
if relay.out_relay.out_id == out_id and key in relay.out_relay.keys:
return relay
return None
def _get_relay_by_out_key_intersection(self, keys):
compare_keys = set(keys)
for relay in self.relays.copy().values():
out_keys = set(relay.out_relay.keys)
diff_set = compare_keys.intersection(out_keys)
if len(diff_set) > 0:
return relay
return None
def _check_if_relay_has_out_key(self, key):
for relay in self.relays.copy().values():
# # self.logger.debug(f"Check out key {key} for probing for relay: {relay}")
if key in relay.out_relay.keys:
return True
return False
def _get_relay_with_out_key(self, key):
for relay in self.relays.copy().values():
if key in relay.out_relay.keys:
return relay
return None
def _check_all_parameter_ids_belong_to_same_rid(self, message: Message):
action = message.action
same_rid = None
for parameter in action.parameters:
if isinstance(parameter, RelayParameter):
if same_rid is None:
same_rid = parameter.relay_id.layer_id
elif parameter.relay_id.layer_id != same_rid:
return False
return True
# DOS Resistance methods
def monitor_transmit(self, message, relay):
"""Monitors transmits.
For further information see thesis of this library.
Args:
message (Message): The message that is transmitted.
relay (Relay): The Relay that transmitted the message.
"""
if message.action.action_type in ModuleConfig.NO_MONITOR_ACTIONS:
return
relay_id = relay.relay_id
if relay_id not in self.transmit_count:
self.transmit_count[relay_id] = 1
else:
self.transmit_count[relay_id] += 1
def monitor_timeout_transmit_rate(self, relay):
"""Monitors transmit rates.
For further information see thesis of this library.
Args:
relay (Relay): The Relay for which the rate should be inserted.
"""
relay_id = relay.relay_id
if relay_id in self.transmit_count:
if relay_id not in self.relay_windows:
self.relay_windows[relay_id] = deque(maxlen=ModuleConfig.WINDOW_SIZE)
transmit_count = self.transmit_count[relay_id]
self.transmit_count[relay_id] = 0
self.relay_windows[relay_id].append(transmit_count)
def check_dos_attack(self, relay):
"""Checks if a dos attack is likely.
For further information see thesis of this library.
Args:
relay (Relay): The Relay for which the rates should be checked.
"""
relay_id = relay.relay_id
if relay_id in self.relay_windows:
relay_window = self.relay_windows[relay_id]
window_list = list(relay_window)
if len(window_list) >= ModuleConfig.WINDOW_SIZE:
window_divider = len(window_list) // 2
window_a = window_list[:window_divider]
window_b = window_list[window_divider:]
# print(window_list)
change_rate = ConceptChange.calculate_windows(window_a, window_b)
# print(change_rate)
if change_rate < relay.dos_threshold:
return True
return False
Classes
class RelayLayer (layer_id, node_queue)-
Represents the implementation of the Relay Layer in the relay model.
The RelayLayer class handles all connections over Relays and tries to stabilize the Relay information by communicating with other RelayLayers. It starts a LinkLayer inside a separated process to send messages over Relays. Furthermore it starts three threads. The first thread sends messages to the LinkLayer. The second Thread is the timeout thread which periodically executes the timeout method of the RelayLayer. The last thread watches the message buffer.
For further information about the RelayLayer see the thesis of this library.
Attributes
link_layer_buffer:multiprocessing.Queue- Stores the buffer of the LinkLayer where messages should be inserted. This messages will get inserted in the right buffer and will be processed from the LinkLayer. The objects that are inserted are always a tuple of data. The first entry of the tuple is either a Relay object if the message should be inserted into a relay buffer or a RelayLayerId when the message should be sent to another RelayLayer.
link_layer_call_pipe:multiprocessing.Pipe- Holds the communication pipe of the LinkLayer. This pipe is used to call methods inside the LinkLayer process. A method can be called with the internal method _call_link_layer_message.
message_buffer:multiprocessing.Queue- This buffer holds messages that needs to be processed by this RelayLayer. It gets filled by the LinkLayer if it receives a message.
link_layer_process:multiprocessing.Process- Holds the LinkLayer process executing the LinkLayer.
RID:RelayLayerId- Holds the id defining this specific relay layer.
relays:dict- Holds a dict of relays. The key is the RelayId of the specific relay and the value the Relay object.
last_id:int- Stores the last id relay id for creation of a new relay. This is counted up when creating a new relay.
active:bool- Holds the active state of the RelayLayer. If this is set to False the RelayLayer tries to shutdown. By default this is set to True.
running:bool- Holds the running state of the RelayLayer. If this is set to False the RelayLayer is stopped and all threads are shutting down.
calling_lock:threading.Lock- Holds a Lock object to prevent simultaneous access to the link_layer_call_pipe.
logger:logging.Logger- Holds the Logger object of this class.
buffer_put_actions:deque- Defines a buffer where every message that needs to be inserted in a buffer gets inserted. The buffer_put_thread is sending this messages to the LinkLayer.
buffer_put_thread:threading.Thread- The thread removes every message containing in the buffer_put_actions and sends it to the LinkLayer with the link_layer_buffer.
message_handling_thread:threading.Thread- Holds the thread that executes every action sent to the RelayLayer. The messages are inside the message_buffer attribute.
timeout_thread:threading.Thread- Holds the timeout thread where the RelayLayer periodically executes the timeout method.
relay_windows:dict- Stores the windows of each relay for DoS attack mitigation. The key is the RelayId and the value is the window for the specific Relay as a deque object.
transmit_count:dict- Stores the transmit count of a specific relay. The key is the RelayId and the value is the count of transmit messages sent over the specific Relay.
By creating a RelayLayer object all attributes gets initializes.
The RelayLayer needs for creation a RelayLayerId to identify this id. Also i needs a buffer from a node where actions that needs to be sent to the node gets inserted.
Args
layer_id:RelayLayerId- Identifying the RelayLayer and gets stored in the RID attribute.
node_queue:multiprocessing.Queue- Defining the buffer of the node.
Expand source code
class RelayLayer: """Represents the implementation of the Relay Layer in the relay model. The RelayLayer class handles all connections over Relays and tries to stabilize the Relay information by communicating with other RelayLayers. It starts a LinkLayer inside a separated process to send messages over Relays. Furthermore it starts three threads. The first thread sends messages to the LinkLayer. The second Thread is the timeout thread which periodically executes the timeout method of the RelayLayer. The last thread watches the message buffer. For further information about the RelayLayer see the thesis of this library. Attributes: link_layer_buffer (multiprocessing.Queue): Stores the buffer of the LinkLayer where messages should be inserted. This messages will get inserted in the right buffer and will be processed from the LinkLayer. The objects that are inserted are always a tuple of data. The first entry of the tuple is either a Relay object if the message should be inserted into a relay buffer or a RelayLayerId when the message should be sent to another RelayLayer. link_layer_call_pipe (multiprocessing.Pipe): Holds the communication pipe of the LinkLayer. This pipe is used to call methods inside the LinkLayer process. A method can be called with the internal method _call_link_layer_message. message_buffer (multiprocessing.Queue): This buffer holds messages that needs to be processed by this RelayLayer. It gets filled by the LinkLayer if it receives a message. link_layer_process (multiprocessing.Process): Holds the LinkLayer process executing the LinkLayer. RID (RelayLayerId): Holds the id defining this specific relay layer. relays (dict): Holds a dict of relays. The key is the RelayId of the specific relay and the value the Relay object. last_id (int): Stores the last id relay id for creation of a new relay. This is counted up when creating a new relay. active (bool): Holds the active state of the RelayLayer. If this is set to False the RelayLayer tries to shutdown. By default this is set to True. running (bool): Holds the running state of the RelayLayer. If this is set to False the RelayLayer is stopped and all threads are shutting down. calling_lock (threading.Lock): Holds a Lock object to prevent simultaneous access to the link_layer_call_pipe. logger (logging.Logger): Holds the Logger object of this class. buffer_put_actions (deque): Defines a buffer where every message that needs to be inserted in a buffer gets inserted. The buffer_put_thread is sending this messages to the LinkLayer. buffer_put_thread (threading.Thread): The thread removes every message containing in the buffer_put_actions and sends it to the LinkLayer with the link_layer_buffer. message_handling_thread (threading.Thread): Holds the thread that executes every action sent to the RelayLayer. The messages are inside the message_buffer attribute. timeout_thread (threading.Thread): Holds the timeout thread where the RelayLayer periodically executes the timeout method. relay_windows (dict): Stores the windows of each relay for DoS attack mitigation. The key is the RelayId and the value is the window for the specific Relay as a deque object. transmit_count (dict): Stores the transmit count of a specific relay. The key is the RelayId and the value is the count of transmit messages sent over the specific Relay. """ def __init__(self, layer_id, node_queue): """By creating a RelayLayer object all attributes gets initializes. The RelayLayer needs for creation a RelayLayerId to identify this id. Also i needs a buffer from a node where actions that needs to be sent to the node gets inserted. Args: layer_id (RelayLayerId): Identifying the RelayLayer and gets stored in the RID attribute. node_queue (multiprocessing.Queue): Defining the buffer of the node. """ self.link_layer_buffer = multiprocessing.Queue() self.link_layer_call_pipe, link_layer_pipe = multiprocessing.Pipe(True) self.message_buffer = multiprocessing.Queue() self.link_layer_process = multiprocessing.Process(target=start_link_layer, args=(self.link_layer_buffer, self.message_buffer, node_queue, layer_id.ip, layer_id.port, link_layer_pipe)) self.link_layer_process.start() self.RID = layer_id self.relays = {} self.last_id = -1 self.active = True self.running = True self.calling_lock = threading.Lock() self.timeout_period = ModuleConfig.RELAY_LAYER_TIMEOUT_PERIOD self.logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL, "relay_layer_{}".format(str(self.RID).replace(':', '_'))) self.buffer_put_actions = deque() self.buffer_put_thread = threading.Thread(target=self._start_buffer_put_thread, daemon=True) self.buffer_put_thread.start() self.timeout_thread = threading.Thread(target=self._start_timeout_thread, daemon=True) self.timeout_thread.start() self.message_handling_thread = threading.Thread(target=self._start_message_handling_thread, daemon=True) self.message_handling_thread.start() self._call_link_layer_message("register_relay_layer", [layer_id]) self.relay_windows = {} self.transmit_count = {} def _start_message_handling_thread(self): self.logger.debug("Start message handling thread") while self.running: try: message = self.message_buffer.get(False, 0.1) self.logger.debug("Message buffer size: {}".format(self.message_buffer.qsize())) if isinstance(message, TransmitMessage): transmit_message = message.message self.handle_transmit(transmit_message) elif isinstance(message, LayerMessage): action = message.action # determine action if isinstance(action, ProbeFailAction): self.handle_probe_fail(action.key, action.key_sequence) elif isinstance(action, PingAction): for ping in action.pings: self.handle_ping(ping.relay_id, ping.level, ping.sink_rid, ping.key) elif isinstance(action, InRelayClosedAction): self.handle_in_relay_closed(action.keys, action.sender_rid, action.relay_id) elif isinstance(action, OutRelayClosedAction): self.handle_out_relay_closed(action.relay_id) elif isinstance(action, NotAuthorizedAction): self.handle_not_authorized(action.keys, action.out_id) else: self.logger.error("No supported class found for message {}".format(message)) except queue.Empty: pass self.logger.debug("Stopped Relay Layer message handling Thread") def _call_link_layer_message(self, function, args, needs_response=False): if self.link_layer_call_pipe is not None: with self.calling_lock: call = [function, needs_response, args] try: self.link_layer_call_pipe.send(call) except Exception as e: print("EXCEPTION in send message {} {}".format(call, e)) if needs_response: try: response = self.link_layer_call_pipe.poll(0.1) except EOFError: print("EOFERROR!!!!") self.link_layer_call_pipe.close() self.link_layer_call_pipe = None except Exception as e: print("EXCEPTION in receive {}".format(e)) else: return response def get_key_for_layer(self): """Gets a authentication key generated by the RelayLayer. Returns: str: A key used for authenticating a connection. """ return KeyGeneration.generate_key(self.RID) def is_sink(self, relay_id: RelayId): """Checks a Relay if it is a sink relay. Args: relay_id (RelayId): The RelayId representing the Relay that should be checked. Returns: bool: True if the specific relay is a sink relay, False otherwise. """ if relay_id in self.relays: check_relay = self.relays[relay_id] return check_relay.is_sink() def has_incoming(self, relay_id: RelayId): """Returns the amount of incoming connections of a given relay. Args: relay_id (RelayId): The RelayId representing the Relay that should be checked. Returns: int: The amount of incoming connections. """ if relay_id in self.relays: check_relay = self.relays[relay_id] return check_relay.has_incoming() def is_direct(self, relay_id: RelayId): """Checks a Relay if it is a direct relay. Args: relay_id (RelayId): The RelayId representing the Relay that should be checked. Returns: bool: True if the specific relay is a direct relay, False otherwise. """ if relay_id in self.relays: check_relay = self.relays[relay_id] return check_relay.is_direct() def same_target(self, relay_id_1: RelayId, relay_id_2: RelayId): """Checks if two relays have the same outgoing connection. Args: relay_id_1 (RelayId): The first relay that should be checked. relay_id_2 (RelayId): The second relay that should be checked. Returns: bool: True if the specific relays have the same outgoing connection, False otherwise. """ if relay_id_1 in self.relays and relay_id_2 in self.relays: check_relay1 = self.relays[relay_id_1] check_relay2 = self.relays[relay_id_2] return check_relay1.same_target(check_relay2) return False def is_dead(self, relay_id: RelayId): """Checks a Relay if it is dead. Args: relay_id (RelayId): The RelayId representing the Relay that should be checked. Returns: bool: True if the specific relay is dead, False otherwise. """ if relay_id in self.relays: check_relay = self.relays[relay_id] return check_relay.is_dead() return False def get_level(self, relay_id: RelayId): """Gets the connection level of a relay. Args: relay_id (RelayId): The RelayId representing the Relay that should be checked. Returns: int: The level of the connection. """ if relay_id in self.relays: check_relay = self.relays[relay_id] return check_relay.level return None def get_sink_node_id(self, relay_id: RelayId): """Gets the node id of the node holding the sink relay of the connection defined by the given relay. Args: relay_id (RelayId): The RelayId representing the Relay that should be checked. Returns: int: The node id of the sink relay. """ if relay_id in self.relays: check_relay = self.relays[relay_id] return check_relay.sink_rid.node_id return None def _get_relay(self, relay_id: RelayId): if relay_id in self.relays: get_relay = self.relays[relay_id] return get_relay return None def _start_buffer_put_thread(self): while self.running: if len(self.buffer_put_actions) > 0: self.logger.debug("Put actions buffer len: {}".format(len(self.buffer_put_actions))) message = self.buffer_put_actions.popleft() buffer = message[0] action = message[1] self.logger.debug(action) try: if isinstance(buffer, RelayLayerId): if buffer == self.RID: self.link_layer_buffer.put_nowait(message) elif isinstance(buffer, Relay): if buffer.relay_id in self.relays: send_message = [buffer, action] self.link_layer_buffer.put_nowait(send_message) else: self.logger.warning("Not right type in put thread: {}".format(message)) except BrokenPipeError: self.logger.warning("BrokenPipeErrror") # self.logger.warning("Stopped Buffer put Thread") def _start_timeout_thread(self): # self.logger.debug("Start timeout thread") while self.running: self.timeout() time.sleep(self.timeout_period) # self.logger.warning("Stopped Timeout Thread") def new_relay(self): """Gets a new relay reference. The RelayLayer creates a new Relay object and provides the RelayId of it. Returns: RelayId: of the new Relay object. """ self.last_id += 1 relay_id = RelayId(self.RID, self.last_id) new_relay = Relay(relay_id) self.add_relay_to_layer(new_relay) return new_relay.relay_id def get_new_relay_object(self): """Gets a new relay object. The method creates a new Relay in the RelayLayer but instead of adding it to the RelayLayer it just returns the object of it. This is needed to create connections between Nodes. The Relay can later be added to the RelayLayer by executing the add_to_layer method. Returns: Relay: A new Relay object. """ self.last_id += 1 relay_id = RelayId(self.RID, self.last_id) new_relay = Relay(relay_id) return new_relay def get_relay_by_relay_id(self, relay_id): """Gets the Relay object of defined by the given RelayId. This should only be used for analysing purposes. Normally one should not have access to the relay object from outside. Args: relay_id (RelayId): The RelayId of the Relay that should be returned. Returns: Relay: The Relay object defined by the given RelayId or None if the Relay is not existent in the RelayLayer. """ if relay_id in self.relays: return self.relays[relay_id] else: return None def add_relay_to_layer(self, new_relay: Relay): """Adds a Relay object to the RelayLayer. The method only accepts Relay object and only adds it to the layer if the relay id is not present in the layer. Args: new_relay (Relay): The Relay object that should be added to the layer. """ if isinstance(new_relay, Relay) and new_relay.relay_id not in self.relays: self.relays[new_relay.relay_id] = new_relay def timeout(self): """The timeout method of the RelayLayer. For further information see the thesis of the library or the phd thesis of Setzer. """ self.logger.debug("Relay length stored: {}".format(len(self.relays))) self.logger.debug(f"Relays: [{RelayLogging.relay_list_to_string(self.relays.values())}]") for relay in self.relays.copy().values(): relay_start = time.time() start_time = time.time_ns() self.logger.debug(f"Timeout by {self.RID} for {relay}") relay_id = relay.relay_id # if relay_id not in self.relay_buffers or relay_id not in self.relays: # continue self.monitor_timeout_transmit_rate(relay) if ModuleConfig.DOS_DETECTION_ACTIVATED: dos_detected = self.check_dos_attack(relay) if dos_detected: self.logger.error("Denial of Service attack detected in relay {}".format(relay)) self.delete(relay.relay_id) continue # relay_buffer = self.relay_buffers[relay_id] if relay_id.layer_id != self.RID: self._timeout_delete_subroutine(relay) continue # self.logger.debug(f"Benchmark timeout 1: {time.time_ns() - start_time}") start_time = time.time_ns() # correct level of relay if relay.out_relay.out_id is None: relay.level = 0 elif relay.level < 1: relay.level = 1 # has out id or out id is a relay layer id out_ref = relay.out_relay if not isinstance(out_ref, OutReference) or out_ref.out_id is not None \ and not isinstance(out_ref.out_id.layer_id, RelayLayerId) \ and not isinstance(out_ref.out_id, RelayId): self._timeout_delete_subroutine(relay) continue # self.logger.debug(f"Benchmark timeout 2: {time.time_ns() - start_time}") start_time = time.time_ns() if out_ref.out_id is None and len(out_ref.keys) > 0: out_ref.keys = set() # self.logger.debug("Reset the keys because out id is not set") if out_ref.out_id is not None and len(out_ref.keys) == 0: self._timeout_delete_subroutine(relay) # self.logger.debug("Delete relay for not having key") continue # # self.logger.debug(f"Benchmark timeout 3: {time.time_ns() - start_time}") start_time = time.time_ns() remove_refs = [] # correct in references for in_ref in relay.in_relays: in_ref_key = in_ref.key needs_deletion = not KeyGeneration.check_key_origin(in_ref_key, self.RID) # check same in references for same key and different rids and relays if not needs_deletion: for check_in_ref in relay.in_relays: if in_ref_key == check_in_ref.key and in_ref.rid != check_in_ref.rid \ and in_ref.relay != check_in_ref.relay: needs_deletion = True break # check other relays for the key if not needs_deletion: for check_relay in self.relays.copy().values(): if check_relay != relay and check_relay.has_key_in_in_ref(in_ref_key): needs_deletion = True break if needs_deletion: print("NEEDS DELETION!!") remove_refs += [in_ref] for in_ref in remove_refs: relay.remove_in_reference(in_ref) # self.logger.debug(f"Benchmark timeout 4: {time.time_ns() - start_time}") start_time = time.time_ns() # ping all relay layers that has an in reference pings = {} for in_ref in relay.in_relays: if in_ref.key != "" and in_ref.rid is not None and in_ref.relay is None: action = Ping(relay.relay_id, relay.level, relay.sink_rid, in_ref.key) if in_ref.rid in pings: pings[in_ref.rid].append(action) else: pings[in_ref.rid] = [action] for rid, ping_list in pings.items(): ping_container = PingAction(ping_list) layer_message = LayerMessage(rid, ping_container) self.buffer_put_actions.append((self.RID, layer_message)) # self.buffer.put_nowait(layer_message) remove_refs = [] for in_ref in relay.in_relays: if not isinstance(in_ref, InReference) and not in_ref.check_valid(): remove_refs += [in_ref] for in_ref in remove_refs: relay.remove_in_reference(in_ref) # self.logger.debug(f"Benchmark timeout 5: {time.time_ns() - start_time}") start_time = time.time_ns() if not relay.alive: relay.clear_in_relays() if relay.out_relay.out_id is None: self._remove_relay_completely(relay) # self.logger.debug(f"Remove completely in not alive for relay {relay.relay_id}") continue else: if not self._has_relay_for_in_relay(relay): action = InRelayClosedAction(relay.out_relay.keys, self.RID, relay.out_relay.out_id) layer_message = LayerMessage(relay.out_relay.out_id.layer_id, action) # self.buffer.put_nowait(layer_message) self.buffer_put_actions.append((self.RID, layer_message)) self._remove_relay_completely(relay) # self.logger.debug(f"Remove completely in not alive and out id is not none for " # f"relay {relay.relay_id}") continue else: self.logger.debug("Cannot remove relay because has relay in relay") # self.logger.debug(f"Benchmark timeout 6: {time.time_ns() - start_time}") start_time = time.time_ns() if not self.active and (out_ref.out_id is None or len(relay.in_relays) == 0 and not self._has_relay_for_in_relay(relay)): if self._call_link_layer_message("is_buffer_empty", [relay.relay_id], True): self._remove_relay_completely(relay) # self.logger.debug("Remove completely in not active and no messages") continue elif not self.active: self.logger.debug("Cannot remove relay {}".format(relay)) # self.logger.debug(f"Benchmark timeout 7: {time.time_ns() - start_time}") start_time = time.time_ns() for check_relay in self.relays.copy().values(): if not check_relay.validated: continue check_out_keys = check_relay.out_relay.keys relay_out_keys = out_ref.keys intersect = relay_out_keys.intersection(check_out_keys) if check_relay != relay and len(intersect) > 0: if check_relay.relay_id.relay_id > relay.relay_id.relay_id: # self.logger.debug("out keys intersect: {}{}{}".format(check_relay, "\n", relay)) self._timeout_delete_subroutine(relay) # self.logger.debug("Completely remove relay in out key compare") continue key_sequence = [out_ref.keys] control_keys = [] # self.logger.debug(f"Benchmark timeout 7: {time.time_ns() - start_time}") start_time = time.time_ns() # buffer_copy = relay.buffer.copy() for check_relay in self.relays.copy().values(): for check_in_ref in check_relay.in_relays: if check_in_ref.rid is None and check_in_ref.relay == relay and check_in_ref.key != "": check_key = check_in_ref.key is_in_buffer = self._call_link_layer_message("check_key_in_relay_buffer", [check_relay.relay_id, check_key], True) if not is_in_buffer: control_keys.append(check_key) # self.logger.debug(f"Benchmark timeout 8: {time.time_ns() - start_time}") start_time = time.time_ns() if (self.active or len(relay.in_relays) > 0 or len(control_keys) > 0) and out_ref.out_id is not None: # self.logger.debug(f"Control keys for timeout probing: {control_keys}") header = Header(out_ref.keys, relay_id.layer_id, out_ref.out_id) action = ProbeAction(control_keys, key_sequence) message = Message(header, action) transmit_message = TransmitMessage(message) start_time = time.time_ns() # relay_buffer.put_nowait(transmit_message) self.buffer_put_actions.append((relay, transmit_message)) # self.logger.debug(f"Benchmark timeout buffer append: {time.time_ns() - start_time}") # self.logger.debug(f"Benchmark timeout 9: {time.time_ns() - start_time}") start_time = time.time_ns() # self.logger.debug(f"Benchmark timeout end relay: {time.time() - relay_start}") if not self.active and len(self.relays.values()) == 0: self.shutdown() def _timeout_delete_subroutine(self, relay: Relay): self.delete(relay.relay_id) for diff_relay in self.relays.copy().values(): diff_relay.remove_in_reference_by_relay(relay) self._remove_relay_completely(relay) def delete(self, relay_id: RelayId): """Deletes a relay. For further information see the thesis of the library or the phd thesis of Setzer. Args: relay_id (RelayId): The RelayId of the Relay that should be deleted. """ self.logger.debug("Delete call for relay {} \n Stack: ".format(relay_id)) # , traceback.format_stack())) if relay_id in self.relays: delete_relay = self.relays[relay_id] delete_relay.alive = False send_rids = [] for in_ref in delete_relay.in_relays: if in_ref.key is not None and in_ref.rid is not None and in_ref.relay is None \ and in_ref.rid not in send_rids: action = OutRelayClosedAction(delete_relay.relay_id) layer_message = LayerMessage(in_ref.rid, action) # self.buffer.put_nowait(layer_message) self.buffer_put_actions.append((self.RID, layer_message)) send_rids.append(in_ref.rid) delete_relay.clear_in_relays() if delete_relay.is_sink(): self._remove_relay_completely(delete_relay) def merge(self, relays): """Merge method of the RelayLayer. For further information see the thesis of the library or the phd thesis of Setzer. Args: relays (list): A list of relay ids that should be merged to one relay. Returns: RelayId or None: The RelayId of the merged Relay or None if merge was not successful. """ start_time = time.time() if len(relays) > 1: # self.logger.debug(f"Merge 1: {relays[0]}") # self.logger.debug(f"Merge 2: {relays[1]}") for index, relay_id in enumerate(relays): if relay_id not in self.relays: self.logger.warning("Merge failed relay {} not in relays".format(relay_id)) return else: relays[index] = self.relays[relay_id] merge_relay = relays[0] common_out_id = merge_relay.out_relay.out_id common_level = merge_relay.level common_alive = merge_relay.alive common_sink_rid = merge_relay.sink_rid if not common_alive: self.logger.warning(f"Merge failed because not alive common") return relay_list_diff = [element.out_relay.keys for element in self.relays.copy().values() if element not in relays] out_keys = set() for relay in relays: out_reference = relay.out_relay if out_reference.out_id != common_out_id or relay.level != common_level or relay.alive != common_alive \ or relay.sink_rid != common_sink_rid or len(relay.in_relays) > 0: self.logger.warning("Merge failed because first check {} {}".format(merge_relay, relay)) return for key in out_reference.keys: out_keys.add(key) for diff_keys in relay_list_diff: if key in diff_keys: self.logger.warning(f"Merge failed because key in diff keys: {diff_keys}") return merge_relay.validated = False merge_relay.out_relay.keys = out_keys merge_relay.out_relay.out_id = common_out_id merge_relay.level = common_level merge_relay.sink_rid = common_sink_rid # merge_relay_buffer = self.relay_buffers[merge_relay.relay_id] # merge in references for relay in relays: if relay == merge_relay: continue for diff_relay in self.relays.copy().values(): if diff_relay in relays: continue diff_relay.replace_relay_in_references(relay, merge_relay) self._remove_relay_completely(relay) # while True: # try: # message = relay_buffer.get(False) # merge_relay_buffer.put_nowait(message) # except queue.Empty: # break # merge_relay.buffer.extend(buffer_list) # self.add_relay_to_layer(merge_relay) self.relays[merge_relay.relay_id] = merge_relay # self.logger.debug(f"Merged into new relay: {merge_relay}") return merge_relay.relay_id def get_relays(self): """Gets all RelayIds of the Relays present in the RelayLayer. Returns: list: A list of RelayIds present in the RelayLayer. """ return [relay.relay_id for relay in self.relays.copy().values() if relay.alive] def get_validated_relays(self): """Gets all RelayIds of the validated Relays present in the RelayLayer. Returns: list: A list of RelayIds which Relay is validated present in the RelayLayer. """ return [relay.relay_id for relay in self.relays.copy().values() if relay.alive and relay.validated] def check_relay_exists(self, relay_id: RelayId): """Checks if a Relay is existing in the RelayLayer. Args: relay_id (RelayId): The RelayId representing the Relay shat should be checked. Returns: bool: True if Relay exists in RelayLayer, False otherwise. """ return relay_id in self.relays def validate_relay(self, relay_id: RelayId): """Validates a given Relay. The method validates the Relay by setting its validated flag to True. Args: relay_id (RelayId): The RelayId of the Relay that should be validated. """ if relay_id in self.relays: this_relay = self.relays[relay_id] this_relay.validated = True def stop(self): """Stops the Relay Layer. The method stops by deleting all relays and setting the active state to False. """ for relay in self.relays.copy().values(): self.delete(relay.relay_id) self.active = False def shutdown(self): """Shutdown the RelayLayer completely without removing Relay connections. """ # self.logger.warning("Relay layer should be shutdown") self.running = False # self.timeout_thread.join() # self.logger.warning("Joined TimeoutThread") self.relays.clear() self.buffer_put_thread.join() # self.logger.debug("Call Shutdown") self._call_link_layer_message("shutdown", []) # self.logger.debug("After Called Shutdown") # self.logger.warning("Called shutdown") # print("Link Layer Process {}".format(self.link_layer_process.pid)) # self.logger.warning("Pre Join") self.link_layer_process.join(3) if self.link_layer_process.exitcode is None: self.link_layer_process.kill() # self.logger.warning("After join") # self.logger.warning("Done shutdown relay layer {}".format(self.RID)) def send(self, send_relay_id: RelayId, action: Action): """Send a action over a specific relay. For further information see the thesis of this library. Args: send_relay_id (RelayId): The RelayId representing the Relay which should transmit the action. action (Action): The action that should be transmitted. """ if send_relay_id not in self.relays: return False # self.logger.warning( # f"Tried send action {action}\n Layer has no relay for the given relay {send_relay}\n") # self.logger.warning(f"\n Stack: {traceback.format_stack()}") # if relay_id.layer_id == self.RID: # self.logger.debug(f"Add relay to layer") # self.add_relay_to_layer(send_relay) # relay = send_relay # else: # self.logger.warning(f"Cannot add relay to layer in send with {send_relay}") # return False else: relay = self.relays[send_relay_id] # self.logger.debug(f"Send to {relay.relay_id}action {action}") if not self.running: self.logger.warning("Cannot send relay layer not running anymore") if isinstance(action, Action): if relay.alive: if not relay.validated: relay.validated = True # Relay is sink relay and should send the action to node if relay.out_relay.out_id is None: # relay_buffer.put_nowait(action) self.buffer_put_actions.append((relay, action)) else: for index, parameter in enumerate(action.parameters): if isinstance(parameter, RelayId): if parameter in self.relays: parameter_relay = self.relays[parameter] key = KeyGeneration.generate_key(self.RID) new_in_ref = InReference(key, None, relay) parameter_relay.add_in_reference(new_in_ref) relay_parameter = RelayParameter(key, parameter_relay.relay_id, parameter_relay.level + 1, parameter_relay.sink_rid) action.parameters[index] = relay_parameter else: # self.logger.warning(f"No relay for parameter id {parameter}") print("ERRROR2") header = Header(relay.out_relay.keys, relay.relay_id.layer_id, relay.out_relay.out_id) message = Message(header, action) transmit_message = TransmitMessage(message) self.logger.debug(f"Send call with {transmit_message}\n") # relay_buffer.put_nowait(transmit_message) self.buffer_put_actions.append((relay, transmit_message)) return True else: print("RELAY {}, {} NOT ALIVE!!".format(relay.relay_id, relay.sink_rid)) self.logger.warning("Cannot send on not alive relay {}".format(relay.relay_id)) def handle_transmit(self, message): """The method is called when a transmit message is received by the RelayLayer. For further information see the thesis of this library. Args: message (Message): The message that got transmitted. """ start_time = time.time() if isinstance(message, Message): header = message.header authorized, relay = self._get_active_relay_for_header(header) # self.logger.debug(f"Benchmark transmit 1: {time.time() - start_time}") start_time = time.time() self.logger.debug(f"Transmit message with header {header}\n and message {message}") if relay and authorized: self.monitor_transmit(message, relay) # self.logger.debug(f"Transmit message with header {header} for relay {relay}") header_keys = header.keys header_sender_rid = header.sender_rid for key in header_keys: for in_ref in relay.in_relays: if in_ref.key == key and in_ref.rid is None and in_ref.relay is not None \ and in_ref.relay.sink_rid == header_sender_rid: in_ref.relay = None in_ref.rid = header_sender_rid # self.logger.debug(f"Benchmark transmit activate connections: {time.time() - start_time}") start_time = time.time() valid_relay_keys = relay.get_valid_keys(header_sender_rid) false_keys = [key for key in header_keys if key not in valid_relay_keys] # self.logger.debug(f"Benchmark transmit get false keys: {time.time() - start_time}") start_time = time.time() # some keys are not valid and should be corrected if len(false_keys) > 0: self.logger.debug("Some keys were invalid: {}\n".format(', '.join(map(str, false_keys)))) # self.logger.debug("Valid keys: {}".format(valid_relay_keys)) self.logger.debug("Send not Authorized in false keys") action = NotAuthorizedAction(false_keys, header.out_id) layer_message = LayerMessage(header_sender_rid, action) # self.buffer.put_nowait(layer_message) self.buffer_put_actions.append((self.RID, layer_message)) # self.logger.debug(f"Benchmark transmit 2: {time.time() - start_time}") start_time = time.time() if relay.out_relay.out_id is None: # relay is a sink relay and has no out reference # if action is a probe action if isinstance(message.action, ProbeAction): # self.logger.debug(f"Got Probe action: {message.action}") # self.logger.debug(f"For Relay: {relay}") control_keys = message.action.control_keys key_sequence = message.action.key_sequence for control_key in control_keys: if not self._check_if_relay_has_out_key(control_key): # self.logger.debug( # f"Benchmark transmit append probe after check: {time.time() - start_time}") start_time = time.time() key_k = key_sequence[-1] # check if probe fails rids = [] for in_ref in relay.in_relays: if in_ref.key in key_k and in_ref.relay is None and in_ref.rid is not None: rids += [in_ref.rid] if len(rids) > 0: # self.logger.debug("Probing failed") # print("Probing Failed!") # if len(rids) > 1: # print(f"More than one rid found: {', '.join(map(str, rids))}") send_rid = rids[0] action = ProbeFailAction(control_key, key_sequence) layer_message = LayerMessage(send_rid, action) # self.buffer.put_nowait(layer_message) self.buffer_put_actions.append((self.RID, layer_message)) else: # self.logger.debug( # f"Benchmark transmit append probe after check: {time.time() - start_time}") start_time = time.time() # self.logger.debug( # f"Benchmark transmit append probe for one key: {time.time() - start_time}") start_time = time.time() elif self._check_all_parameter_ids_belong_to_same_rid(message): # self.logger.debug(f"Forward message: {message}") action = message.action replace_parameter_list = [] for index, parameter in enumerate(action.parameters): if isinstance(parameter, RelayParameter): parameter_key = parameter.key ref_relay = self._get_relay_with_out_key(parameter_key) # self.logger.debug(f"Benchmark transmit replace get relay: {time.time() - start_time}") start_time = time.time() # print(self.RID, parameter, action.action_type) if parameter.level > 0 and ref_relay is None: self.last_id += 1 new_relay = Relay(RelayId(self.RID, self.last_id)) new_relay.out_relay.keys.add(parameter_key) new_relay.out_relay.out_id = parameter.relay_id new_relay.level = parameter.level new_relay.sink_rid = parameter.rid new_relay.validated = False probe_action = ProbeAction([], [parameter_key]) new_header = Header({parameter_key}, new_relay.relay_id.layer_id, parameter.relay_id) new_message = Message(new_header, probe_action) transmit_message = TransmitMessage(new_message) # self.logger.debug( # f"Benchmark transmit replace before replacement: {time.time() - start_time}") start_time = time.time() # new_relay_buffer.put_nowait(transmit_message) self.buffer_put_actions.append((new_relay, transmit_message)) # self.logger.debug( # f"Benchmark transmit replace after append: {time.time() - start_time}") start_time = time.time() self.add_relay_to_layer(new_relay) # self.logger.debug( # f"Benchmark transmit replace append and add: {time.time() - start_time}") replace_parameter_list += [(index, new_relay.relay_id)] # self.logger.debug( # f"Replacing to relay {new_relay.relay_id} for parameters in {action}:" # f" {parameter}") else: replace_parameter_list += [(index, None)] # self.logger.debug( # f"Benchmark transmit replace after replacement: {time.time() - start_time}") start_time = time.time() for index, replace_element in replace_parameter_list: action.parameters[index] = replace_element # relay_buffer.put_nowait(action) self.buffer_put_actions.append((relay, action)) else: self.logger.error("Message is corrupted: {}\n".format(message)) # self.logger.debug(f"Benchmark transmit append: {time.time() - start_time}") start_time = time.time() else: # message gets forwarded to next relay action = message.action if isinstance(action, ProbeAction): action.key_sequence.append(relay.out_relay.keys) control_keys = action.control_keys for control_key in control_keys.copy(): if self._call_link_layer_message("check_key_in_relay_buffer", [relay.relay_id, control_key], True): control_keys.remove(control_key) send_header = Header(relay.out_relay.keys, relay.relay_id.layer_id, relay.out_relay.out_id) send_message = Message(send_header, action) transmit_message = TransmitMessage(send_message) # relay_buffer.put_nowait(transmit_message) self.buffer_put_actions.append((relay, transmit_message)) # self.logger.debug(f"Benchmark transmit forward: {time.time() - start_time}") start_time = time.time() elif relay and not authorized: self.logger.debug(f"Not authorized for header {header} and relay {relay}") # print(self.RID, f"Send not authorized in tramsit for header {header} and relay {relay}") action = NotAuthorizedAction(header.keys, header.out_id) layer_message = LayerMessage(header.sender_rid, action) # self.buffer.put_nowait(layer_message) self.buffer_put_actions.append((self.RID, layer_message)) elif header.out_id is not None and header.out_id.layer_id == self.RID: action = OutRelayClosedAction(header.out_id) layer_message = LayerMessage(header.sender_rid, action) # self.buffer.put_nowait(layer_message) self.buffer_put_actions.append((self.RID, layer_message)) else: self.logger.warning("No Relay found for header {}\n".format(header)) def handle_not_authorized(self, keys, out_id): """The method is called when a NotAuthorized message is received by the RelayLayer. For further information see the thesis of this library. Args: keys (list): List of keys. out_id (RelayId): The RelayId that is not authorized. """ self.logger.debug(f"Handle not authorized executed with: {keys}, {out_id}") for key in keys.copy(): if out_id is not None: relay = self._get_relay_by_out_id_and_out_key(out_id, key) if relay is not None: out_ref = relay.out_relay out_ref.remove_key(key) if len(out_ref.keys) == 0: # remove all pending relays sent via r for check_relay in self.relays.copy().values(): check_relay.remove_in_reference_by_relay(relay) self.delete(relay.relay_id) self._remove_relay_completely(relay) else: self.logger.warning( "NOT_AUTHORIZED: No relay found with out id {} and key {}\n".format(out_id, key)) else: # Message corrupted self.logger.error("Out id is empty in handle not authorized\n") def handle_probe_fail(self, key, key_sequence): """The method is called when a ProbeFail message is received by the RelayLayer. For further information see the thesis of this library. Args: key (str): Key that failed the Probe message. key_sequence (list): List of key lists that represent the route the Probe message went. """ self.logger.debug(f"Handle probe_fail executed with: {key}, {key_sequence}") # get relays where some keys from key_sequence[k] are in relay out keys k = len(key_sequence) if k == 0: # no key sequence given self.logger.error("PROBE_FAIL: Key sequence empty\n") return relay = self._get_relay_by_out_key_intersection(key_sequence[k - 1]) if relay is not None: if k > 1: # call needs to be passed on compare_keys = key_sequence[k - 2] new_key_sequence = key_sequence[:-1] for in_ref in relay.in_relays: if in_ref.relay is None and in_ref.rid is not None and in_ref.key in compare_keys: action = ProbeFailAction(key, new_key_sequence) layer_message = LayerMessage(in_ref.rid, action) # self.buffer.put_nowait(layer_message) self.buffer_put_actions.append((self.RID, layer_message)) break else: for diff_relay in self.relays.copy().values(): diff_relay.remove_in_reference_by_relay_and_key(key, relay) def handle_in_relay_closed(self, keys: list, sender_rid: RelayLayerId, relay_id: RelayId): """The method is called when a InRelayClosed message is received by the RelayLayer. For further information see the thesis of this library. Args: keys (list): The list of keys that are used in the closed connection. sender_rid (RelayLayerId): The RelayLayerId of the RelayLayer that sent the message. relay_id (RelayId): The relay id that is closed. """ self.logger.debug(f"Handle in_relay_closed executed with: {keys}, {sender_rid}, {relay_id}") if relay_id in self.relays: relay = self.relays[relay_id] for key in keys: relay.remove_in_reference_by_key_and_rid(key, sender_rid) def handle_ping(self, relay_id: RelayId, level: int, sink_rid: RelayLayerId, key: str): """The method is called when a Ping message is received by the RelayLayer. For further information see the thesis of this library. Args: relay_id (RelayId): The relay id there should be a connection to. level (list): The level of the connection. sink_rid (RelayLayerId): The sink rid of the connection. key (str): The key of the connection """ self.logger.debug(f"Handle ping executed with: {relay_id}, {level}, {sink_rid}, {key}") if relay_id is not None: relay = self._get_relay_by_id_and_out_key(relay_id, key) if relay is not None: relay.sink_rid = sink_rid if relay.level > level + 1: relay.level = level + 1 if relay.level < level + 1: # self.logger.debug(f"PING failed and relay closed {relay}\n with parameter: {relay_id}," # f" {level}, {sink_rid}, {key}\n") self.delete(relay.relay_id) for diff_relay in self.relays.copy().values(): diff_relay.remove_in_reference_by_relay(relay) self._remove_relay_completely(relay) else: self.logger.debug(f"NO RELAY FOR PING -> In Relay closed\n with parameter: {relay_id}," f" {level}, {sink_rid}, {key}\n") action = InRelayClosedAction([key], self.RID, relay_id) layer_message = LayerMessage(relay_id.layer_id, action) # self.buffer.put_nowait(layer_message) self.buffer_put_actions.append((self.RID, layer_message)) else: print("No pid given") def handle_out_relay_closed(self, relay_id: RelayId): """The method is called when a OutRelayClosed message is received by the RelayLayer. For further information see the thesis of this library. Args: relay_id (RelayId): The RelayId of the Relay that is closed. """ self.logger.debug(f"Handle out_relay_closed executed with: {relay_id}") if isinstance(relay_id, RelayId): closed_relay = self._get_relay_by_out_id(relay_id) if closed_relay is not None: # remove all "pending" relays sent via r for relay in self.relays.copy().values(): relay.remove_in_reference_by_relay(closed_relay) self.delete(closed_relay.relay_id) self._remove_relay_completely(closed_relay) else: self.logger.warning("OUT_RELAY_CLOSED: No relay with relay id {} found".format(relay_id)) else: self.logger.error( "Relay id {} provided in the wrong format. Should be an object of RelayId".format(relay_id)) def _has_relay_for_in_relay(self, relay): for check_relay in self.relays.copy().values(): for in_ref in check_relay.in_relays: if in_ref.relay == relay: # self.logger.debug(f"Cannot send in relay closed because {relay.relay_id} is in {check_relay}") return True return False def _remove_relay_completely(self, relay): self.logger.debug("Call for remove relay completely for relay {} \n Stack: " .format(relay.relay_id)) # , traceback.format_stack())) if relay.relay_id in self.relays: self.relays.pop(relay.relay_id) # self.relay_buffers.pop(relay.relay_id) if relay.relay_id in self.relay_windows: self.relay_windows.pop(relay.relay_id) if relay.relay_id in self.transmit_count: self.transmit_count.pop(relay.relay_id) self._call_link_layer_message("stop_relay_watch", [relay.relay_id]) def _get_active_relay_for_header(self, header): if not isinstance(header, Header): self.logger.error("No header provided to get relay for specific header\n") return False, None out_id = header.out_id keys = header.keys sender_rid = header.sender_rid # # self.logger.debug(f"Get active relay for header with header: {header}") if out_id in self.relays: relay = self.relays[out_id] if relay.alive: for in_ref in relay.in_relays.copy(): # self.logger.debug(f"In reference: {in_ref}") in_ref_key = in_ref.key in_ref_rid = in_ref.rid in_ref_relay = in_ref.relay # # self.logger.debug(f"In ref relay: {in_ref_relay}") if in_ref_key in keys: if in_ref_rid is not None and in_ref_relay is None and in_ref_rid == sender_rid: return True, relay elif in_ref_rid is None and in_ref_relay is not None \ and in_ref_relay.relay_id.layer_id == self.RID \ and in_ref_relay.sink_rid == sender_rid: return True, relay # relay with id found but is not authorized return False, relay # no relay found return False, None def _get_relay_by_out_id(self, relay_id): for relay in self.relays.copy().values(): if relay.out_relay.out_id == relay_id: return relay return None def _get_relay_by_id_and_out_key(self, relay_id: RelayId, key): for relay in self.relays.copy().values(): if relay.out_relay.out_id == relay_id and key in relay.out_relay.keys: return relay return None def _get_relay_by_out_id_and_out_key(self, out_id: RelayId, key): for relay in self.relays.copy().values(): if relay.out_relay.out_id == out_id and key in relay.out_relay.keys: return relay return None def _get_relay_by_out_key_intersection(self, keys): compare_keys = set(keys) for relay in self.relays.copy().values(): out_keys = set(relay.out_relay.keys) diff_set = compare_keys.intersection(out_keys) if len(diff_set) > 0: return relay return None def _check_if_relay_has_out_key(self, key): for relay in self.relays.copy().values(): # # self.logger.debug(f"Check out key {key} for probing for relay: {relay}") if key in relay.out_relay.keys: return True return False def _get_relay_with_out_key(self, key): for relay in self.relays.copy().values(): if key in relay.out_relay.keys: return relay return None def _check_all_parameter_ids_belong_to_same_rid(self, message: Message): action = message.action same_rid = None for parameter in action.parameters: if isinstance(parameter, RelayParameter): if same_rid is None: same_rid = parameter.relay_id.layer_id elif parameter.relay_id.layer_id != same_rid: return False return True # DOS Resistance methods def monitor_transmit(self, message, relay): """Monitors transmits. For further information see thesis of this library. Args: message (Message): The message that is transmitted. relay (Relay): The Relay that transmitted the message. """ if message.action.action_type in ModuleConfig.NO_MONITOR_ACTIONS: return relay_id = relay.relay_id if relay_id not in self.transmit_count: self.transmit_count[relay_id] = 1 else: self.transmit_count[relay_id] += 1 def monitor_timeout_transmit_rate(self, relay): """Monitors transmit rates. For further information see thesis of this library. Args: relay (Relay): The Relay for which the rate should be inserted. """ relay_id = relay.relay_id if relay_id in self.transmit_count: if relay_id not in self.relay_windows: self.relay_windows[relay_id] = deque(maxlen=ModuleConfig.WINDOW_SIZE) transmit_count = self.transmit_count[relay_id] self.transmit_count[relay_id] = 0 self.relay_windows[relay_id].append(transmit_count) def check_dos_attack(self, relay): """Checks if a dos attack is likely. For further information see thesis of this library. Args: relay (Relay): The Relay for which the rates should be checked. """ relay_id = relay.relay_id if relay_id in self.relay_windows: relay_window = self.relay_windows[relay_id] window_list = list(relay_window) if len(window_list) >= ModuleConfig.WINDOW_SIZE: window_divider = len(window_list) // 2 window_a = window_list[:window_divider] window_b = window_list[window_divider:] # print(window_list) change_rate = ConceptChange.calculate_windows(window_a, window_b) # print(change_rate) if change_rate < relay.dos_threshold: return True return FalseMethods
def add_relay_to_layer(self, new_relay: Relay)-
Adds a Relay object to the RelayLayer.
The method only accepts Relay object and only adds it to the layer if the relay id is not present in the layer.
Args
new_relay:Relay- The Relay object that should be added to the layer.
Expand source code
def add_relay_to_layer(self, new_relay: Relay): """Adds a Relay object to the RelayLayer. The method only accepts Relay object and only adds it to the layer if the relay id is not present in the layer. Args: new_relay (Relay): The Relay object that should be added to the layer. """ if isinstance(new_relay, Relay) and new_relay.relay_id not in self.relays: self.relays[new_relay.relay_id] = new_relay def check_dos_attack(self, relay)-
Checks if a dos attack is likely.
For further information see thesis of this library.
Args
relay:Relay- The Relay for which the rates should be checked.
Expand source code
def check_dos_attack(self, relay): """Checks if a dos attack is likely. For further information see thesis of this library. Args: relay (Relay): The Relay for which the rates should be checked. """ relay_id = relay.relay_id if relay_id in self.relay_windows: relay_window = self.relay_windows[relay_id] window_list = list(relay_window) if len(window_list) >= ModuleConfig.WINDOW_SIZE: window_divider = len(window_list) // 2 window_a = window_list[:window_divider] window_b = window_list[window_divider:] # print(window_list) change_rate = ConceptChange.calculate_windows(window_a, window_b) # print(change_rate) if change_rate < relay.dos_threshold: return True return False def check_relay_exists(self, relay_id: RelayId)-
Checks if a Relay is existing in the RelayLayer.
Args
relay_id:RelayId- The RelayId representing the Relay shat should be checked.
Returns
bool- True if Relay exists in RelayLayer, False otherwise.
Expand source code
def check_relay_exists(self, relay_id: RelayId): """Checks if a Relay is existing in the RelayLayer. Args: relay_id (RelayId): The RelayId representing the Relay shat should be checked. Returns: bool: True if Relay exists in RelayLayer, False otherwise. """ return relay_id in self.relays def delete(self, relay_id: RelayId)-
Deletes a relay.
For further information see the thesis of the library or the phd thesis of Setzer.
Args
relay_id:RelayId- The RelayId of the Relay that should be deleted.
Expand source code
def delete(self, relay_id: RelayId): """Deletes a relay. For further information see the thesis of the library or the phd thesis of Setzer. Args: relay_id (RelayId): The RelayId of the Relay that should be deleted. """ self.logger.debug("Delete call for relay {} \n Stack: ".format(relay_id)) # , traceback.format_stack())) if relay_id in self.relays: delete_relay = self.relays[relay_id] delete_relay.alive = False send_rids = [] for in_ref in delete_relay.in_relays: if in_ref.key is not None and in_ref.rid is not None and in_ref.relay is None \ and in_ref.rid not in send_rids: action = OutRelayClosedAction(delete_relay.relay_id) layer_message = LayerMessage(in_ref.rid, action) # self.buffer.put_nowait(layer_message) self.buffer_put_actions.append((self.RID, layer_message)) send_rids.append(in_ref.rid) delete_relay.clear_in_relays() if delete_relay.is_sink(): self._remove_relay_completely(delete_relay) def get_key_for_layer(self)-
Gets a authentication key generated by the RelayLayer.
Returns
str- A key used for authenticating a connection.
Expand source code
def get_key_for_layer(self): """Gets a authentication key generated by the RelayLayer. Returns: str: A key used for authenticating a connection. """ return KeyGeneration.generate_key(self.RID) def get_level(self, relay_id: RelayId)-
Gets the connection level of a relay.
Args
relay_id:RelayId- The RelayId representing the Relay that should be checked.
Returns
int- The level of the connection.
Expand source code
def get_level(self, relay_id: RelayId): """Gets the connection level of a relay. Args: relay_id (RelayId): The RelayId representing the Relay that should be checked. Returns: int: The level of the connection. """ if relay_id in self.relays: check_relay = self.relays[relay_id] return check_relay.level return None def get_new_relay_object(self)-
Gets a new relay object.
The method creates a new Relay in the RelayLayer but instead of adding it to the RelayLayer it just returns the object of it. This is needed to create connections between Nodes. The Relay can later be added to the RelayLayer by executing the add_to_layer method.
Returns
Relay- A new Relay object.
Expand source code
def get_new_relay_object(self): """Gets a new relay object. The method creates a new Relay in the RelayLayer but instead of adding it to the RelayLayer it just returns the object of it. This is needed to create connections between Nodes. The Relay can later be added to the RelayLayer by executing the add_to_layer method. Returns: Relay: A new Relay object. """ self.last_id += 1 relay_id = RelayId(self.RID, self.last_id) new_relay = Relay(relay_id) return new_relay def get_relay_by_relay_id(self, relay_id)-
Gets the Relay object of defined by the given RelayId.
This should only be used for analysing purposes. Normally one should not have access to the relay object from outside.
Args
relay_id:RelayId- The RelayId of the Relay that should be returned.
Returns
Relay- The Relay object defined by the given RelayId or None if the Relay is not existent in the RelayLayer.
Expand source code
def get_relay_by_relay_id(self, relay_id): """Gets the Relay object of defined by the given RelayId. This should only be used for analysing purposes. Normally one should not have access to the relay object from outside. Args: relay_id (RelayId): The RelayId of the Relay that should be returned. Returns: Relay: The Relay object defined by the given RelayId or None if the Relay is not existent in the RelayLayer. """ if relay_id in self.relays: return self.relays[relay_id] else: return None def get_relays(self)-
Gets all RelayIds of the Relays present in the RelayLayer.
Returns
list- A list of RelayIds present in the RelayLayer.
Expand source code
def get_relays(self): """Gets all RelayIds of the Relays present in the RelayLayer. Returns: list: A list of RelayIds present in the RelayLayer. """ return [relay.relay_id for relay in self.relays.copy().values() if relay.alive] def get_sink_node_id(self, relay_id: RelayId)-
Gets the node id of the node holding the sink relay of the connection defined by the given relay.
Args
relay_id:RelayId- The RelayId representing the Relay that should be checked.
Returns
int- The node id of the sink relay.
Expand source code
def get_sink_node_id(self, relay_id: RelayId): """Gets the node id of the node holding the sink relay of the connection defined by the given relay. Args: relay_id (RelayId): The RelayId representing the Relay that should be checked. Returns: int: The node id of the sink relay. """ if relay_id in self.relays: check_relay = self.relays[relay_id] return check_relay.sink_rid.node_id return None def get_validated_relays(self)-
Gets all RelayIds of the validated Relays present in the RelayLayer.
Returns
list- A list of RelayIds which Relay is validated present in the RelayLayer.
Expand source code
def get_validated_relays(self): """Gets all RelayIds of the validated Relays present in the RelayLayer. Returns: list: A list of RelayIds which Relay is validated present in the RelayLayer. """ return [relay.relay_id for relay in self.relays.copy().values() if relay.alive and relay.validated] def handle_in_relay_closed(self, keys: list, sender_rid: RelayLayerId, relay_id: RelayId)-
The method is called when a InRelayClosed message is received by the RelayLayer.
For further information see the thesis of this library.
Args
keys:list- The list of keys that are used in the closed connection.
sender_rid:RelayLayerId- The RelayLayerId of the RelayLayer that sent the message.
relay_id:RelayId- The relay id that is closed.
Expand source code
def handle_in_relay_closed(self, keys: list, sender_rid: RelayLayerId, relay_id: RelayId): """The method is called when a InRelayClosed message is received by the RelayLayer. For further information see the thesis of this library. Args: keys (list): The list of keys that are used in the closed connection. sender_rid (RelayLayerId): The RelayLayerId of the RelayLayer that sent the message. relay_id (RelayId): The relay id that is closed. """ self.logger.debug(f"Handle in_relay_closed executed with: {keys}, {sender_rid}, {relay_id}") if relay_id in self.relays: relay = self.relays[relay_id] for key in keys: relay.remove_in_reference_by_key_and_rid(key, sender_rid) -
The method is called when a NotAuthorized message is received by the RelayLayer.
For further information see the thesis of this library.
Args
keys:list- List of keys.
out_id:RelayId- The RelayId that is not authorized.
Expand source code
def handle_not_authorized(self, keys, out_id): """The method is called when a NotAuthorized message is received by the RelayLayer. For further information see the thesis of this library. Args: keys (list): List of keys. out_id (RelayId): The RelayId that is not authorized. """ self.logger.debug(f"Handle not authorized executed with: {keys}, {out_id}") for key in keys.copy(): if out_id is not None: relay = self._get_relay_by_out_id_and_out_key(out_id, key) if relay is not None: out_ref = relay.out_relay out_ref.remove_key(key) if len(out_ref.keys) == 0: # remove all pending relays sent via r for check_relay in self.relays.copy().values(): check_relay.remove_in_reference_by_relay(relay) self.delete(relay.relay_id) self._remove_relay_completely(relay) else: self.logger.warning( "NOT_AUTHORIZED: No relay found with out id {} and key {}\n".format(out_id, key)) else: # Message corrupted self.logger.error("Out id is empty in handle not authorized\n") def handle_out_relay_closed(self, relay_id: RelayId)-
The method is called when a OutRelayClosed message is received by the RelayLayer.
For further information see the thesis of this library.
Args
relay_id:RelayId- The RelayId of the Relay that is closed.
Expand source code
def handle_out_relay_closed(self, relay_id: RelayId): """The method is called when a OutRelayClosed message is received by the RelayLayer. For further information see the thesis of this library. Args: relay_id (RelayId): The RelayId of the Relay that is closed. """ self.logger.debug(f"Handle out_relay_closed executed with: {relay_id}") if isinstance(relay_id, RelayId): closed_relay = self._get_relay_by_out_id(relay_id) if closed_relay is not None: # remove all "pending" relays sent via r for relay in self.relays.copy().values(): relay.remove_in_reference_by_relay(closed_relay) self.delete(closed_relay.relay_id) self._remove_relay_completely(closed_relay) else: self.logger.warning("OUT_RELAY_CLOSED: No relay with relay id {} found".format(relay_id)) else: self.logger.error( "Relay id {} provided in the wrong format. Should be an object of RelayId".format(relay_id)) def handle_ping(self, relay_id: RelayId, level: int, sink_rid: RelayLayerId, key: str)-
The method is called when a Ping message is received by the RelayLayer.
For further information see the thesis of this library.
Args
relay_id:RelayId- The relay id there should be a connection to.
level:list- The level of the connection.
sink_rid:RelayLayerId- The sink rid of the connection.
key:str- The key of the connection
Expand source code
def handle_ping(self, relay_id: RelayId, level: int, sink_rid: RelayLayerId, key: str): """The method is called when a Ping message is received by the RelayLayer. For further information see the thesis of this library. Args: relay_id (RelayId): The relay id there should be a connection to. level (list): The level of the connection. sink_rid (RelayLayerId): The sink rid of the connection. key (str): The key of the connection """ self.logger.debug(f"Handle ping executed with: {relay_id}, {level}, {sink_rid}, {key}") if relay_id is not None: relay = self._get_relay_by_id_and_out_key(relay_id, key) if relay is not None: relay.sink_rid = sink_rid if relay.level > level + 1: relay.level = level + 1 if relay.level < level + 1: # self.logger.debug(f"PING failed and relay closed {relay}\n with parameter: {relay_id}," # f" {level}, {sink_rid}, {key}\n") self.delete(relay.relay_id) for diff_relay in self.relays.copy().values(): diff_relay.remove_in_reference_by_relay(relay) self._remove_relay_completely(relay) else: self.logger.debug(f"NO RELAY FOR PING -> In Relay closed\n with parameter: {relay_id}," f" {level}, {sink_rid}, {key}\n") action = InRelayClosedAction([key], self.RID, relay_id) layer_message = LayerMessage(relay_id.layer_id, action) # self.buffer.put_nowait(layer_message) self.buffer_put_actions.append((self.RID, layer_message)) else: print("No pid given") def handle_probe_fail(self, key, key_sequence)-
The method is called when a ProbeFail message is received by the RelayLayer.
For further information see the thesis of this library.
Args
key:str- Key that failed the Probe message.
key_sequence:list- List of key lists that represent the route the Probe message went.
Expand source code
def handle_probe_fail(self, key, key_sequence): """The method is called when a ProbeFail message is received by the RelayLayer. For further information see the thesis of this library. Args: key (str): Key that failed the Probe message. key_sequence (list): List of key lists that represent the route the Probe message went. """ self.logger.debug(f"Handle probe_fail executed with: {key}, {key_sequence}") # get relays where some keys from key_sequence[k] are in relay out keys k = len(key_sequence) if k == 0: # no key sequence given self.logger.error("PROBE_FAIL: Key sequence empty\n") return relay = self._get_relay_by_out_key_intersection(key_sequence[k - 1]) if relay is not None: if k > 1: # call needs to be passed on compare_keys = key_sequence[k - 2] new_key_sequence = key_sequence[:-1] for in_ref in relay.in_relays: if in_ref.relay is None and in_ref.rid is not None and in_ref.key in compare_keys: action = ProbeFailAction(key, new_key_sequence) layer_message = LayerMessage(in_ref.rid, action) # self.buffer.put_nowait(layer_message) self.buffer_put_actions.append((self.RID, layer_message)) break else: for diff_relay in self.relays.copy().values(): diff_relay.remove_in_reference_by_relay_and_key(key, relay) def handle_transmit(self, message)-
The method is called when a transmit message is received by the RelayLayer.
For further information see the thesis of this library.
Args
message:Message- The message that got transmitted.
Expand source code
def handle_transmit(self, message): """The method is called when a transmit message is received by the RelayLayer. For further information see the thesis of this library. Args: message (Message): The message that got transmitted. """ start_time = time.time() if isinstance(message, Message): header = message.header authorized, relay = self._get_active_relay_for_header(header) # self.logger.debug(f"Benchmark transmit 1: {time.time() - start_time}") start_time = time.time() self.logger.debug(f"Transmit message with header {header}\n and message {message}") if relay and authorized: self.monitor_transmit(message, relay) # self.logger.debug(f"Transmit message with header {header} for relay {relay}") header_keys = header.keys header_sender_rid = header.sender_rid for key in header_keys: for in_ref in relay.in_relays: if in_ref.key == key and in_ref.rid is None and in_ref.relay is not None \ and in_ref.relay.sink_rid == header_sender_rid: in_ref.relay = None in_ref.rid = header_sender_rid # self.logger.debug(f"Benchmark transmit activate connections: {time.time() - start_time}") start_time = time.time() valid_relay_keys = relay.get_valid_keys(header_sender_rid) false_keys = [key for key in header_keys if key not in valid_relay_keys] # self.logger.debug(f"Benchmark transmit get false keys: {time.time() - start_time}") start_time = time.time() # some keys are not valid and should be corrected if len(false_keys) > 0: self.logger.debug("Some keys were invalid: {}\n".format(', '.join(map(str, false_keys)))) # self.logger.debug("Valid keys: {}".format(valid_relay_keys)) self.logger.debug("Send not Authorized in false keys") action = NotAuthorizedAction(false_keys, header.out_id) layer_message = LayerMessage(header_sender_rid, action) # self.buffer.put_nowait(layer_message) self.buffer_put_actions.append((self.RID, layer_message)) # self.logger.debug(f"Benchmark transmit 2: {time.time() - start_time}") start_time = time.time() if relay.out_relay.out_id is None: # relay is a sink relay and has no out reference # if action is a probe action if isinstance(message.action, ProbeAction): # self.logger.debug(f"Got Probe action: {message.action}") # self.logger.debug(f"For Relay: {relay}") control_keys = message.action.control_keys key_sequence = message.action.key_sequence for control_key in control_keys: if not self._check_if_relay_has_out_key(control_key): # self.logger.debug( # f"Benchmark transmit append probe after check: {time.time() - start_time}") start_time = time.time() key_k = key_sequence[-1] # check if probe fails rids = [] for in_ref in relay.in_relays: if in_ref.key in key_k and in_ref.relay is None and in_ref.rid is not None: rids += [in_ref.rid] if len(rids) > 0: # self.logger.debug("Probing failed") # print("Probing Failed!") # if len(rids) > 1: # print(f"More than one rid found: {', '.join(map(str, rids))}") send_rid = rids[0] action = ProbeFailAction(control_key, key_sequence) layer_message = LayerMessage(send_rid, action) # self.buffer.put_nowait(layer_message) self.buffer_put_actions.append((self.RID, layer_message)) else: # self.logger.debug( # f"Benchmark transmit append probe after check: {time.time() - start_time}") start_time = time.time() # self.logger.debug( # f"Benchmark transmit append probe for one key: {time.time() - start_time}") start_time = time.time() elif self._check_all_parameter_ids_belong_to_same_rid(message): # self.logger.debug(f"Forward message: {message}") action = message.action replace_parameter_list = [] for index, parameter in enumerate(action.parameters): if isinstance(parameter, RelayParameter): parameter_key = parameter.key ref_relay = self._get_relay_with_out_key(parameter_key) # self.logger.debug(f"Benchmark transmit replace get relay: {time.time() - start_time}") start_time = time.time() # print(self.RID, parameter, action.action_type) if parameter.level > 0 and ref_relay is None: self.last_id += 1 new_relay = Relay(RelayId(self.RID, self.last_id)) new_relay.out_relay.keys.add(parameter_key) new_relay.out_relay.out_id = parameter.relay_id new_relay.level = parameter.level new_relay.sink_rid = parameter.rid new_relay.validated = False probe_action = ProbeAction([], [parameter_key]) new_header = Header({parameter_key}, new_relay.relay_id.layer_id, parameter.relay_id) new_message = Message(new_header, probe_action) transmit_message = TransmitMessage(new_message) # self.logger.debug( # f"Benchmark transmit replace before replacement: {time.time() - start_time}") start_time = time.time() # new_relay_buffer.put_nowait(transmit_message) self.buffer_put_actions.append((new_relay, transmit_message)) # self.logger.debug( # f"Benchmark transmit replace after append: {time.time() - start_time}") start_time = time.time() self.add_relay_to_layer(new_relay) # self.logger.debug( # f"Benchmark transmit replace append and add: {time.time() - start_time}") replace_parameter_list += [(index, new_relay.relay_id)] # self.logger.debug( # f"Replacing to relay {new_relay.relay_id} for parameters in {action}:" # f" {parameter}") else: replace_parameter_list += [(index, None)] # self.logger.debug( # f"Benchmark transmit replace after replacement: {time.time() - start_time}") start_time = time.time() for index, replace_element in replace_parameter_list: action.parameters[index] = replace_element # relay_buffer.put_nowait(action) self.buffer_put_actions.append((relay, action)) else: self.logger.error("Message is corrupted: {}\n".format(message)) # self.logger.debug(f"Benchmark transmit append: {time.time() - start_time}") start_time = time.time() else: # message gets forwarded to next relay action = message.action if isinstance(action, ProbeAction): action.key_sequence.append(relay.out_relay.keys) control_keys = action.control_keys for control_key in control_keys.copy(): if self._call_link_layer_message("check_key_in_relay_buffer", [relay.relay_id, control_key], True): control_keys.remove(control_key) send_header = Header(relay.out_relay.keys, relay.relay_id.layer_id, relay.out_relay.out_id) send_message = Message(send_header, action) transmit_message = TransmitMessage(send_message) # relay_buffer.put_nowait(transmit_message) self.buffer_put_actions.append((relay, transmit_message)) # self.logger.debug(f"Benchmark transmit forward: {time.time() - start_time}") start_time = time.time() elif relay and not authorized: self.logger.debug(f"Not authorized for header {header} and relay {relay}") # print(self.RID, f"Send not authorized in tramsit for header {header} and relay {relay}") action = NotAuthorizedAction(header.keys, header.out_id) layer_message = LayerMessage(header.sender_rid, action) # self.buffer.put_nowait(layer_message) self.buffer_put_actions.append((self.RID, layer_message)) elif header.out_id is not None and header.out_id.layer_id == self.RID: action = OutRelayClosedAction(header.out_id) layer_message = LayerMessage(header.sender_rid, action) # self.buffer.put_nowait(layer_message) self.buffer_put_actions.append((self.RID, layer_message)) else: self.logger.warning("No Relay found for header {}\n".format(header)) def has_incoming(self, relay_id: RelayId)-
Returns the amount of incoming connections of a given relay.
Args
relay_id:RelayId- The RelayId representing the Relay that should be checked.
Returns
int- The amount of incoming connections.
Expand source code
def has_incoming(self, relay_id: RelayId): """Returns the amount of incoming connections of a given relay. Args: relay_id (RelayId): The RelayId representing the Relay that should be checked. Returns: int: The amount of incoming connections. """ if relay_id in self.relays: check_relay = self.relays[relay_id] return check_relay.has_incoming() def is_dead(self, relay_id: RelayId)-
Checks a Relay if it is dead.
Args
relay_id:RelayId- The RelayId representing the Relay that should be checked.
Returns
bool- True if the specific relay is dead, False otherwise.
Expand source code
def is_dead(self, relay_id: RelayId): """Checks a Relay if it is dead. Args: relay_id (RelayId): The RelayId representing the Relay that should be checked. Returns: bool: True if the specific relay is dead, False otherwise. """ if relay_id in self.relays: check_relay = self.relays[relay_id] return check_relay.is_dead() return False def is_direct(self, relay_id: RelayId)-
Checks a Relay if it is a direct relay.
Args
relay_id:RelayId- The RelayId representing the Relay that should be checked.
Returns
bool- True if the specific relay is a direct relay, False otherwise.
Expand source code
def is_direct(self, relay_id: RelayId): """Checks a Relay if it is a direct relay. Args: relay_id (RelayId): The RelayId representing the Relay that should be checked. Returns: bool: True if the specific relay is a direct relay, False otherwise. """ if relay_id in self.relays: check_relay = self.relays[relay_id] return check_relay.is_direct() def is_sink(self, relay_id: RelayId)-
Checks a Relay if it is a sink relay.
Args
relay_id:RelayId- The RelayId representing the Relay that should be checked.
Returns
bool- True if the specific relay is a sink relay, False otherwise.
Expand source code
def is_sink(self, relay_id: RelayId): """Checks a Relay if it is a sink relay. Args: relay_id (RelayId): The RelayId representing the Relay that should be checked. Returns: bool: True if the specific relay is a sink relay, False otherwise. """ if relay_id in self.relays: check_relay = self.relays[relay_id] return check_relay.is_sink() def merge(self, relays)-
Merge method of the RelayLayer.
For further information see the thesis of the library or the phd thesis of Setzer.
Args
relays:list- A list of relay ids that should be merged to one relay.
Returns
RelayIdorNone- The RelayId of the merged Relay or None if merge was not successful.
Expand source code
def merge(self, relays): """Merge method of the RelayLayer. For further information see the thesis of the library or the phd thesis of Setzer. Args: relays (list): A list of relay ids that should be merged to one relay. Returns: RelayId or None: The RelayId of the merged Relay or None if merge was not successful. """ start_time = time.time() if len(relays) > 1: # self.logger.debug(f"Merge 1: {relays[0]}") # self.logger.debug(f"Merge 2: {relays[1]}") for index, relay_id in enumerate(relays): if relay_id not in self.relays: self.logger.warning("Merge failed relay {} not in relays".format(relay_id)) return else: relays[index] = self.relays[relay_id] merge_relay = relays[0] common_out_id = merge_relay.out_relay.out_id common_level = merge_relay.level common_alive = merge_relay.alive common_sink_rid = merge_relay.sink_rid if not common_alive: self.logger.warning(f"Merge failed because not alive common") return relay_list_diff = [element.out_relay.keys for element in self.relays.copy().values() if element not in relays] out_keys = set() for relay in relays: out_reference = relay.out_relay if out_reference.out_id != common_out_id or relay.level != common_level or relay.alive != common_alive \ or relay.sink_rid != common_sink_rid or len(relay.in_relays) > 0: self.logger.warning("Merge failed because first check {} {}".format(merge_relay, relay)) return for key in out_reference.keys: out_keys.add(key) for diff_keys in relay_list_diff: if key in diff_keys: self.logger.warning(f"Merge failed because key in diff keys: {diff_keys}") return merge_relay.validated = False merge_relay.out_relay.keys = out_keys merge_relay.out_relay.out_id = common_out_id merge_relay.level = common_level merge_relay.sink_rid = common_sink_rid # merge_relay_buffer = self.relay_buffers[merge_relay.relay_id] # merge in references for relay in relays: if relay == merge_relay: continue for diff_relay in self.relays.copy().values(): if diff_relay in relays: continue diff_relay.replace_relay_in_references(relay, merge_relay) self._remove_relay_completely(relay) # while True: # try: # message = relay_buffer.get(False) # merge_relay_buffer.put_nowait(message) # except queue.Empty: # break # merge_relay.buffer.extend(buffer_list) # self.add_relay_to_layer(merge_relay) self.relays[merge_relay.relay_id] = merge_relay # self.logger.debug(f"Merged into new relay: {merge_relay}") return merge_relay.relay_id def monitor_timeout_transmit_rate(self, relay)-
Monitors transmit rates.
For further information see thesis of this library.
Args
relay:Relay- The Relay for which the rate should be inserted.
Expand source code
def monitor_timeout_transmit_rate(self, relay): """Monitors transmit rates. For further information see thesis of this library. Args: relay (Relay): The Relay for which the rate should be inserted. """ relay_id = relay.relay_id if relay_id in self.transmit_count: if relay_id not in self.relay_windows: self.relay_windows[relay_id] = deque(maxlen=ModuleConfig.WINDOW_SIZE) transmit_count = self.transmit_count[relay_id] self.transmit_count[relay_id] = 0 self.relay_windows[relay_id].append(transmit_count) def monitor_transmit(self, message, relay)-
Monitors transmits.
For further information see thesis of this library.
Args
message:Message- The message that is transmitted.
relay:Relay- The Relay that transmitted the message.
Expand source code
def monitor_transmit(self, message, relay): """Monitors transmits. For further information see thesis of this library. Args: message (Message): The message that is transmitted. relay (Relay): The Relay that transmitted the message. """ if message.action.action_type in ModuleConfig.NO_MONITOR_ACTIONS: return relay_id = relay.relay_id if relay_id not in self.transmit_count: self.transmit_count[relay_id] = 1 else: self.transmit_count[relay_id] += 1 def new_relay(self)-
Gets a new relay reference.
The RelayLayer creates a new Relay object and provides the RelayId of it.
Returns
RelayId- of the new Relay object.
Expand source code
def new_relay(self): """Gets a new relay reference. The RelayLayer creates a new Relay object and provides the RelayId of it. Returns: RelayId: of the new Relay object. """ self.last_id += 1 relay_id = RelayId(self.RID, self.last_id) new_relay = Relay(relay_id) self.add_relay_to_layer(new_relay) return new_relay.relay_id def same_target(self, relay_id_1: RelayId, relay_id_2: RelayId)-
Checks if two relays have the same outgoing connection.
Args
relay_id_1:RelayId- The first relay that should be checked.
relay_id_2:RelayId- The second relay that should be checked.
Returns
bool- True if the specific relays have the same outgoing connection, False otherwise.
Expand source code
def same_target(self, relay_id_1: RelayId, relay_id_2: RelayId): """Checks if two relays have the same outgoing connection. Args: relay_id_1 (RelayId): The first relay that should be checked. relay_id_2 (RelayId): The second relay that should be checked. Returns: bool: True if the specific relays have the same outgoing connection, False otherwise. """ if relay_id_1 in self.relays and relay_id_2 in self.relays: check_relay1 = self.relays[relay_id_1] check_relay2 = self.relays[relay_id_2] return check_relay1.same_target(check_relay2) return False def send(self, send_relay_id: RelayId, action: Action)-
Send a action over a specific relay.
For further information see the thesis of this library.
Args
send_relay_id:RelayId- The RelayId representing the Relay which should transmit the action.
action:Action- The action that should be transmitted.
Expand source code
def send(self, send_relay_id: RelayId, action: Action): """Send a action over a specific relay. For further information see the thesis of this library. Args: send_relay_id (RelayId): The RelayId representing the Relay which should transmit the action. action (Action): The action that should be transmitted. """ if send_relay_id not in self.relays: return False # self.logger.warning( # f"Tried send action {action}\n Layer has no relay for the given relay {send_relay}\n") # self.logger.warning(f"\n Stack: {traceback.format_stack()}") # if relay_id.layer_id == self.RID: # self.logger.debug(f"Add relay to layer") # self.add_relay_to_layer(send_relay) # relay = send_relay # else: # self.logger.warning(f"Cannot add relay to layer in send with {send_relay}") # return False else: relay = self.relays[send_relay_id] # self.logger.debug(f"Send to {relay.relay_id}action {action}") if not self.running: self.logger.warning("Cannot send relay layer not running anymore") if isinstance(action, Action): if relay.alive: if not relay.validated: relay.validated = True # Relay is sink relay and should send the action to node if relay.out_relay.out_id is None: # relay_buffer.put_nowait(action) self.buffer_put_actions.append((relay, action)) else: for index, parameter in enumerate(action.parameters): if isinstance(parameter, RelayId): if parameter in self.relays: parameter_relay = self.relays[parameter] key = KeyGeneration.generate_key(self.RID) new_in_ref = InReference(key, None, relay) parameter_relay.add_in_reference(new_in_ref) relay_parameter = RelayParameter(key, parameter_relay.relay_id, parameter_relay.level + 1, parameter_relay.sink_rid) action.parameters[index] = relay_parameter else: # self.logger.warning(f"No relay for parameter id {parameter}") print("ERRROR2") header = Header(relay.out_relay.keys, relay.relay_id.layer_id, relay.out_relay.out_id) message = Message(header, action) transmit_message = TransmitMessage(message) self.logger.debug(f"Send call with {transmit_message}\n") # relay_buffer.put_nowait(transmit_message) self.buffer_put_actions.append((relay, transmit_message)) return True else: print("RELAY {}, {} NOT ALIVE!!".format(relay.relay_id, relay.sink_rid)) self.logger.warning("Cannot send on not alive relay {}".format(relay.relay_id)) def shutdown(self)-
Shutdown the RelayLayer completely without removing Relay connections.
Expand source code
def shutdown(self): """Shutdown the RelayLayer completely without removing Relay connections. """ # self.logger.warning("Relay layer should be shutdown") self.running = False # self.timeout_thread.join() # self.logger.warning("Joined TimeoutThread") self.relays.clear() self.buffer_put_thread.join() # self.logger.debug("Call Shutdown") self._call_link_layer_message("shutdown", []) # self.logger.debug("After Called Shutdown") # self.logger.warning("Called shutdown") # print("Link Layer Process {}".format(self.link_layer_process.pid)) # self.logger.warning("Pre Join") self.link_layer_process.join(3) if self.link_layer_process.exitcode is None: self.link_layer_process.kill() def stop(self)-
Stops the Relay Layer.
The method stops by deleting all relays and setting the active state to False.
Expand source code
def stop(self): """Stops the Relay Layer. The method stops by deleting all relays and setting the active state to False. """ for relay in self.relays.copy().values(): self.delete(relay.relay_id) self.active = False def timeout(self)-
The timeout method of the RelayLayer.
For further information see the thesis of the library or the phd thesis of Setzer.
Expand source code
def timeout(self): """The timeout method of the RelayLayer. For further information see the thesis of the library or the phd thesis of Setzer. """ self.logger.debug("Relay length stored: {}".format(len(self.relays))) self.logger.debug(f"Relays: [{RelayLogging.relay_list_to_string(self.relays.values())}]") for relay in self.relays.copy().values(): relay_start = time.time() start_time = time.time_ns() self.logger.debug(f"Timeout by {self.RID} for {relay}") relay_id = relay.relay_id # if relay_id not in self.relay_buffers or relay_id not in self.relays: # continue self.monitor_timeout_transmit_rate(relay) if ModuleConfig.DOS_DETECTION_ACTIVATED: dos_detected = self.check_dos_attack(relay) if dos_detected: self.logger.error("Denial of Service attack detected in relay {}".format(relay)) self.delete(relay.relay_id) continue # relay_buffer = self.relay_buffers[relay_id] if relay_id.layer_id != self.RID: self._timeout_delete_subroutine(relay) continue # self.logger.debug(f"Benchmark timeout 1: {time.time_ns() - start_time}") start_time = time.time_ns() # correct level of relay if relay.out_relay.out_id is None: relay.level = 0 elif relay.level < 1: relay.level = 1 # has out id or out id is a relay layer id out_ref = relay.out_relay if not isinstance(out_ref, OutReference) or out_ref.out_id is not None \ and not isinstance(out_ref.out_id.layer_id, RelayLayerId) \ and not isinstance(out_ref.out_id, RelayId): self._timeout_delete_subroutine(relay) continue # self.logger.debug(f"Benchmark timeout 2: {time.time_ns() - start_time}") start_time = time.time_ns() if out_ref.out_id is None and len(out_ref.keys) > 0: out_ref.keys = set() # self.logger.debug("Reset the keys because out id is not set") if out_ref.out_id is not None and len(out_ref.keys) == 0: self._timeout_delete_subroutine(relay) # self.logger.debug("Delete relay for not having key") continue # # self.logger.debug(f"Benchmark timeout 3: {time.time_ns() - start_time}") start_time = time.time_ns() remove_refs = [] # correct in references for in_ref in relay.in_relays: in_ref_key = in_ref.key needs_deletion = not KeyGeneration.check_key_origin(in_ref_key, self.RID) # check same in references for same key and different rids and relays if not needs_deletion: for check_in_ref in relay.in_relays: if in_ref_key == check_in_ref.key and in_ref.rid != check_in_ref.rid \ and in_ref.relay != check_in_ref.relay: needs_deletion = True break # check other relays for the key if not needs_deletion: for check_relay in self.relays.copy().values(): if check_relay != relay and check_relay.has_key_in_in_ref(in_ref_key): needs_deletion = True break if needs_deletion: print("NEEDS DELETION!!") remove_refs += [in_ref] for in_ref in remove_refs: relay.remove_in_reference(in_ref) # self.logger.debug(f"Benchmark timeout 4: {time.time_ns() - start_time}") start_time = time.time_ns() # ping all relay layers that has an in reference pings = {} for in_ref in relay.in_relays: if in_ref.key != "" and in_ref.rid is not None and in_ref.relay is None: action = Ping(relay.relay_id, relay.level, relay.sink_rid, in_ref.key) if in_ref.rid in pings: pings[in_ref.rid].append(action) else: pings[in_ref.rid] = [action] for rid, ping_list in pings.items(): ping_container = PingAction(ping_list) layer_message = LayerMessage(rid, ping_container) self.buffer_put_actions.append((self.RID, layer_message)) # self.buffer.put_nowait(layer_message) remove_refs = [] for in_ref in relay.in_relays: if not isinstance(in_ref, InReference) and not in_ref.check_valid(): remove_refs += [in_ref] for in_ref in remove_refs: relay.remove_in_reference(in_ref) # self.logger.debug(f"Benchmark timeout 5: {time.time_ns() - start_time}") start_time = time.time_ns() if not relay.alive: relay.clear_in_relays() if relay.out_relay.out_id is None: self._remove_relay_completely(relay) # self.logger.debug(f"Remove completely in not alive for relay {relay.relay_id}") continue else: if not self._has_relay_for_in_relay(relay): action = InRelayClosedAction(relay.out_relay.keys, self.RID, relay.out_relay.out_id) layer_message = LayerMessage(relay.out_relay.out_id.layer_id, action) # self.buffer.put_nowait(layer_message) self.buffer_put_actions.append((self.RID, layer_message)) self._remove_relay_completely(relay) # self.logger.debug(f"Remove completely in not alive and out id is not none for " # f"relay {relay.relay_id}") continue else: self.logger.debug("Cannot remove relay because has relay in relay") # self.logger.debug(f"Benchmark timeout 6: {time.time_ns() - start_time}") start_time = time.time_ns() if not self.active and (out_ref.out_id is None or len(relay.in_relays) == 0 and not self._has_relay_for_in_relay(relay)): if self._call_link_layer_message("is_buffer_empty", [relay.relay_id], True): self._remove_relay_completely(relay) # self.logger.debug("Remove completely in not active and no messages") continue elif not self.active: self.logger.debug("Cannot remove relay {}".format(relay)) # self.logger.debug(f"Benchmark timeout 7: {time.time_ns() - start_time}") start_time = time.time_ns() for check_relay in self.relays.copy().values(): if not check_relay.validated: continue check_out_keys = check_relay.out_relay.keys relay_out_keys = out_ref.keys intersect = relay_out_keys.intersection(check_out_keys) if check_relay != relay and len(intersect) > 0: if check_relay.relay_id.relay_id > relay.relay_id.relay_id: # self.logger.debug("out keys intersect: {}{}{}".format(check_relay, "\n", relay)) self._timeout_delete_subroutine(relay) # self.logger.debug("Completely remove relay in out key compare") continue key_sequence = [out_ref.keys] control_keys = [] # self.logger.debug(f"Benchmark timeout 7: {time.time_ns() - start_time}") start_time = time.time_ns() # buffer_copy = relay.buffer.copy() for check_relay in self.relays.copy().values(): for check_in_ref in check_relay.in_relays: if check_in_ref.rid is None and check_in_ref.relay == relay and check_in_ref.key != "": check_key = check_in_ref.key is_in_buffer = self._call_link_layer_message("check_key_in_relay_buffer", [check_relay.relay_id, check_key], True) if not is_in_buffer: control_keys.append(check_key) # self.logger.debug(f"Benchmark timeout 8: {time.time_ns() - start_time}") start_time = time.time_ns() if (self.active or len(relay.in_relays) > 0 or len(control_keys) > 0) and out_ref.out_id is not None: # self.logger.debug(f"Control keys for timeout probing: {control_keys}") header = Header(out_ref.keys, relay_id.layer_id, out_ref.out_id) action = ProbeAction(control_keys, key_sequence) message = Message(header, action) transmit_message = TransmitMessage(message) start_time = time.time_ns() # relay_buffer.put_nowait(transmit_message) self.buffer_put_actions.append((relay, transmit_message)) # self.logger.debug(f"Benchmark timeout buffer append: {time.time_ns() - start_time}") # self.logger.debug(f"Benchmark timeout 9: {time.time_ns() - start_time}") start_time = time.time_ns() # self.logger.debug(f"Benchmark timeout end relay: {time.time() - relay_start}") if not self.active and len(self.relays.values()) == 0: self.shutdown() def validate_relay(self, relay_id: RelayId)-
Validates a given Relay.
The method validates the Relay by setting its validated flag to True.
Args
relay_id:RelayId- The RelayId of the Relay that should be validated.
Expand source code
def validate_relay(self, relay_id: RelayId): """Validates a given Relay. The method validates the Relay by setting its validated flag to True. Args: relay_id (RelayId): The RelayId of the Relay that should be validated. """ if relay_id in self.relays: this_relay = self.relays[relay_id] this_relay.validated = True