Module RelayModel.Node
Expand source code
import multiprocessing
import queue
import threading
import time
import zmq
from RelayModel import ModuleConfig, RelayLogging
from RelayModel.Communication import Action
from RelayModel.RelayId import RelayLayerId, RelayId
from RelayModel.RelayLayer import RelayLayer
class Node:
"""The Node class represents a Node in a distributed System.
The Node class is the main class for creating distributed protocols with the relay model.
This class implements functionality to work with the relay model and leave a system without harming its
connectivity. It implements the protocol provided by Setzer in his phd thesis:
(https://digital.ub.uni-paderborn.de/urn/urn:nbn:de:hbz:466:2-37849).
Attributes:
buffer (multiprocessing.Queue): Defines a buffer for the node where actions can be inserted by the RelayLayer.
Every action in this buffer gets processed by the node.
relay_layer (RelayLayer): Stores a RelayLayer object for this node.
running (bool): Stores the state of the node. If True the node is running if False the node is not running and
all threads are stopped.
timeout_thread (threading.Thread): Stores the timeout thread where timeout action were inserted in the message
buffer.
message_thread (threading.Thread): Stores the message thread of the node. The thread periodically checks if
there is a new Action in the node_buffer and executes the Action.
logger (logging.Logger): Stores the Logger class of the Node.
timeout_period (float): Stores the period of the timeout execution. By default this is set to the value set in
the ModuleConfig.
leaving (bool): Stores the leaving state of the node. When set to True the Node changes its behaviour where it
tries to leave the system by the protocol provided by Setzer.
in_ref (RelayId): Stores a relay id to the main sink relay of this node.
N (list): Stores a list which is a pseudo variable for all relays stored in an underlying protocol.
D (list): Stores a list of RelayIds which should be deleted in the protocol.
a_out (RelayId): Stores the RelayId of the outgoing ancher.
a_in (RelayId): Stores the RelayId of the incoming ancher.
analyse_mode (bool): Handles the analyse mode of the node. When set to True the Node is executed in analyse mode
and sends every timeout a state to a StateMonitor. By default this is set to False and can be activated
either in the constructor or by changing the attribute.
"""
def __init__(self, node_id: int, ip: str, port: int, analyse_mode=False):
"""The constructor creates a Node object with given node id, ip and port.
When creating a Node object the constructor initializes a RelayLayer and creates all necessary threads.
All threads are not started until the `start` method is called.
Args:
node_id (int): Defines the node id of a node to form a topology.
ip (str): Defines the ip address where the node should be reachable on.
port (int): Defines the port where the node should be reachable on.
analyse_mode (bool, optional): Optionally the analyse mode can be turned on while creating the node. By
default this parameter is set to False.
"""
self.buffer = multiprocessing.Queue()
layer_id = RelayLayerId(ip, port)
layer_id.node_id = node_id
self.relay_layer = RelayLayer(layer_id, self.buffer)
self.running = True
self.timeout_thread = threading.Thread(target=self._start_timeout_thread, daemon=True)
self.message_thread = threading.Thread(target=self._start_message_handling, daemon=True)
self.logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL, "node_{}_{}".format(ip, port))
self.timeout_period = ModuleConfig.NODE_TIMEOUT_PERIOD
self.leaving = False
self.in_ref = None
self.N = []
self.D = []
self.a_out = None
self.a_in = None
self.analyse_mode = analyse_mode
self._context = zmq.Context()
self.logger.debug("Start node")
def _start_message_handling(self):
while self.running:
try:
message = self.buffer.get(False, 0.1)
self.logger.debug("buffer size: {}".format(self.buffer.qsize()))
if isinstance(message, Action):
self.replace_action(message)
else:
self.logger.error("No supported class found for message {}".format(message))
except queue.Empty:
pass
self.logger.debug("Stopped Node Thread")
def _start_timeout_thread(self):
while self.running:
action = Action('timeout', [])
self.buffer.put(action)
# self.timeout()
time.sleep(self.timeout_period)
self.logger.debug("Stopped Timeout Thread")
def timeout(self):
"""The Timeout method is called periodically and corrects all values in the node.
The method is correcting every value in the node or if the node is leaving it will prepare the node for leaving.
For further information about the protocol see the thesis of this paper.
"""
self.logger.debug("in_ref " + str(self.in_ref))
self.logger.debug('a_in ' + str(self.a_in))
self.logger.debug('a_out ' + str(self.a_out))
for relay in self.get_relays_from_original_variables():
if not self.relay_layer.check_relay_exists(relay):
self.remove_from_original_variables(relay)
if self.in_ref is not None and not self.relay_layer.check_relay_exists(self.in_ref):
self.in_ref = None
if self.a_out is not None and not self.relay_layer.check_relay_exists(self.a_out):
self.a_out = None
if self.a_in is not None and not self.relay_layer.check_relay_exists(self.a_in):
self.a_in = None
validated_relays = self.relay_layer.get_validated_relays()
for relay in validated_relays:
if relay not in self.D and not self.check_in_original_variables(relay) and relay != self.in_ref \
and relay != self.a_in and relay != self.a_out:
self.D.append(relay)
if self.in_ref is None or not self.relay_layer.is_sink(self.in_ref):
if self.in_ref is not None:
self.D.append(self.in_ref)
self.in_ref = self.relay_layer.new_relay()
if not self.leaving:
for relay in self.get_relays_from_original_variables():
# if not self.call_relay_layer_method("check_direct", True, (relay,)):
if not self.relay_layer.is_direct(relay):
self.remove_from_original_variables(relay)
self.D.append(relay)
if self.a_in is not None:
self.D.append(self.a_in)
self.a_in = None
if self.a_out is not None:
self.D.append(self.a_out)
self.a_out = None
for relay in self.D.copy():
if not self.relay_layer.has_incoming(relay):
self.D.remove(relay)
self.reversal_of_relay(relay)
self.relay_layer.delete(relay)
time_start = time.time()
self.original_timeout()
self.logger.debug("Orig timeout time: {}".format(time.time() - time_start))
else:
for relay in self.get_relays_from_original_variables():
self.remove_from_original_variables(relay)
self.D.append(relay)
if self.a_in is not None and not self.relay_layer.is_sink(self.a_in):
self.D.append(self.a_in)
self.a_in = self.relay_layer.new_relay()
if self.a_in is None:
self.a_in = self.relay_layer.new_relay()
if self.a_out is not None:
if self.relay_layer.has_incoming(self.a_out) or not self.relay_layer.is_direct(self.a_out)\
or self.relay_layer.is_sink(self.a_out):
self.D.append(self.a_out)
self.a_out = None
for relay in self.D.copy():
if not self.relay_layer.has_incoming(relay) and relay != self.a_out:
action = Action('ask_to_reverse', [self.in_ref])
self.relay_layer.send(relay, action)
self.D.remove(relay)
self.logger.debug("Delete relay: {}".format(relay))
self.relay_layer.delete(relay)
if len(self.D) == 0 and not self.relay_layer.has_incoming(self.in_ref) \
and not self.relay_layer.has_incoming(self.a_in):
self.shutdown()
return
self.logger.debug("LEAVING CHECK: {} {} {}".format(self.node_id, len(self.D),
self.relay_layer.has_incoming(self.in_ref),
self.relay_layer.has_incoming(self.a_in)))
if self.a_out is not None:
if not self.relay_layer.has_incoming(self.a_in):
action = Action("ask_to_reverse_anchor", [self.a_in])
self.relay_layer.send(self.a_out, action)
action = Action("notify_anchor", [])
self.relay_layer.send(self.a_out, action)
if self.analyse_mode:
# Send local state to monitor
done = self.send_analyse_state()
if done:
self.shutdown()
def original_timeout(self):
"""Can be overridden to implement a underlying protocol timeout.
This method is automatically executed on the end of every timeout of this node class.
It should be implemented with the timeout of the underlying protocol.
Example:
See `SortedListNode`
"""
pass
def reversal_of_relay(self, relay_id: RelayId):
"""Should be overridden to reverse a specific relay reference.
The method should just send a node action from the underlying protocol over the given relay with the in_ref as
parameter.
Example:
See `SortedListNode`.
Example for the linearize Action
self.call_method(relay_id, 'linearize', [self.in_ref])
"""
pass
def check_in_original_variables(self, relay_id: RelayId):
"""Should be overridden to check if a specific relay is in one of the underlying protocol variables.
By default it checks if the relay is in the attribute N. This should be changed to a better implementation when
implementing a underlying protocol.
Example:
See `SortedListNode` implementation.
Args:
relay_id (RelayId): Defines the RelayId that should be checked.
"""
return relay_id in self.N
def remove_from_original_variables(self, relay_id: RelayId):
"""Should be overridden to remove relays from underlying protocol variables.
By default it removes the given relay id from the N attribute. This should be changed to a better implementation
when implementing a underlying protocol.
Example:
See `SortedListNode` implementation.
Args:
relay_id (RelayId): Defines the RelayId that should be removed from the variables.
"""
self.N.remove(relay_id)
def get_relays_from_original_variables(self):
"""Should be overridden to get all relays from underlying protocol variables.
This normally just returns the N attribute list. This should be changed to a better implementation when
implementing a underlying protocol. It should return a list of RelayIds that are stored in any variable define
in the protocol.
Example:
See `SortedListNode` implementation.
Returns:
list: A list of RelayIds containing in variables of the protocol.
"""
return self.N
def send_analyse_state(self):
"""Should be overridden to send a state to a StateMonitor.
In this method a State object should be created. After that it should be sent with the `send_state_to_monitor`
method.
Example:
See `SortedListNode` implementation.
"""
pass
def replace_action(self, action):
"""This method is called whenever the node received a Action.
The method executes the method defined in the received action if the node is staying or reverses the connection
if the node is leaving.
Args:
action (Action): The received action that should be executed.
"""
# self.logger.debug(f"Calling Replace_action with {action}")
action_name = action.action_type.lower()
if not self.leaving or action_name == "timeout" or action_name == "reverse" or action_name == "ask_to_reverse" \
or action_name == "ask_to_reverse_anchor" or action_name == "notify_anchor":
action_parameters = action.parameters
if action_name == "ask_to_reverse_anchor":
action_parameters.append(action.receiving_relay)
try:
action_function = getattr(self, action_name)
except AttributeError:
self.logger.warning("Action with action type {} not found!".format(action_name))
else:
if callable(action_function):
action_function(*action_parameters)
else:
self.logger.warning("Action with action type {} is not callable or a function!".format(action_name))
else:
action_parameters = action.parameters
receiving_relay = action.receiving_relay
if receiving_relay is None:
self.logger.warning("Receiving relay was empty for action {}".format(action))
else:
for parameter in action_parameters:
if isinstance(parameter, RelayId):
action = Action('ask_to_reverse', [receiving_relay])
self.relay_layer.send(parameter, action)
self.relay_layer.delete(parameter)
def ask_to_reverse(self, out: RelayId):
"""Implements the ask_to_reverse protocol method.
For further information see the phd thesis.
Args:
out (RelayId): Defines the RelayId that should be reversed.
"""
self.logger.debug("Calling ask_to_reverse with {}".format(out))
if not self.leaving:
for v in self.get_relays_from_original_variables():
if self.relay_layer.same_target(out, v):
self.remove_from_original_variables(v)
self.D.append(v)
action = Action("reverse", [self.in_ref])
# self.call_relay_layer_method("send", True, [out, action])
# self.call_relay_layer_method("delete", False, [out])
self.relay_layer.send(out, action)
self.relay_layer.delete(out)
else:
if self.a_out is None:
action = Action('ask_to_reverse', [self.in_ref])
# self.call_relay_layer_method("send", True, [out, action])
# self.call_relay_layer_method("delete", False, [out])
self.relay_layer.send(out, action)
self.relay_layer.delete(out)
else:
if self.relay_layer.same_target(out, self.a_out):
# new_relay = self.call_relay_layer_method("merge", True, [[out, self.a_out]])
new_relay = self.relay_layer.merge([self.a_out, out])
if new_relay:
self.a_out = None
action = Action('ask_to_reverse', [self.in_ref])
# self.call_relay_layer_method("send", True, [new_relay, action])
self.relay_layer.send(new_relay, action)
# else:
# self.logger.warning("While merge {} \nand {}\n something went wrong".format(out, self.a_out))
else:
action = Action('reverse', [self.a_out])
# self.call_relay_layer_method("send", True, [out, action])
# self.call_relay_layer_method("delete", False, [out])
self.relay_layer.send(out, action)
self.relay_layer.delete(out)
def ask_to_reverse_anchor(self, out: RelayId, receiving_relay: RelayId):
"""Implements the ask_to_reverse_anchor protocol method.
For further information see the phd thesis.
Args:
out (RelayId): Defines a Relay.
receiving_relay (RelayId): Defines the relay that received this action.
"""
self.logger.debug("Calling ask_to_reverse_anchor with {}\n {}".format(out, receiving_relay))
if not self.leaving:
action = Action('reverse', [self.in_ref])
# self.call_relay_layer_method("send", True, [out, action])
# self.call_relay_layer_method("delete", False, [out])
self.relay_layer.send(out, action)
self.relay_layer.delete(out)
else:
if receiving_relay is None:
self.logger.warning("Receiving relay was empty for action ask to reverse anchor")
else:
action = Action('ask_to_reverse', [receiving_relay])
# self.call_relay_layer_method("send", True, [out, action])
self.relay_layer.send(out, action)
self.relay_layer.delete(out)
def notify_anchor(self):
"""Implements the notify_anchor protocol method.
For further information see the phd thesis.
"""
self.logger.debug("Calling notify anchor")
if self.leaving:
if self.a_in is not None:
self.D.append(self.a_in)
# self.a_in = self.call_relay_layer_method("new_relay", True, [True])
self.a_in = self.relay_layer.new_relay()
def reverse(self, out: RelayId):
"""Implements the reverse protocol method.
For further information see the phd thesis.
Args:
out (RelayId): Defines a Relay.
"""
self.logger.debug("Calling reverse with {}".format(out))
if not self.leaving:
self.reversal_of_relay(out)
# self.call_relay_layer_method("delete", False, [out])
self.relay_layer.delete(out)
else:
if self.a_out is None:
if self.relay_layer.is_dead(out):
print("Reverse relay is not alive {}".format(out))
if self.relay_layer.is_direct(out):
self.a_out = out
# print(f"OUT: {self.a_out}")
else:
# Ask out to send a direct relay
action = Action('ask_to_reverse', [self.in_ref])
# self.call_relay_layer_method("send", True, [out, action])
# self.call_relay_layer_method("delete", False, [out])
self.relay_layer.send(out, action)
self.relay_layer.delete(out)
else:
if self.relay_layer.same_target(out, self.a_out):
start_time = time.time()
# merged = self.call_relay_layer_method("merge", True, [[a_out, out]])
merged = self.relay_layer.merge([self.a_out, out])
if merged:
self.a_out = merged
else:
# print("not same target")
action = Action('ask_to_reverse', [self.a_out])
# self.call_relay_layer_method("send", True, [out, action])
# self.call_relay_layer_method("delete", False, [out])
self.relay_layer.send(out, action)
self.relay_layer.delete(out)
def call_method(self, relay_id: RelayId, method: str, parameters: list):
"""Calls a method of another node connected with the given relay.
This method calls the given method with the given parameters in the node that handles the sink relay that is
connected with the relay defined by the given relay id.
It calls the send method from the RelayLayer to send the method to the node.
Args:
relay_id (RelayId): Defines the Relay which is used to send the action.
method (str): Defines the name of the method that should be executed in the other node.
parameters (list): Defines a list of parameters used to execute the method.
"""
action = Action(method, parameters)
# self.call_relay_layer_method("send", True, [relay, action])
result = self.relay_layer.send(relay_id, action)
if not result:
self.logger.error("SEND CALL FAILED FOR RELAY {}".format(relay_id))
def start(self):
"""Starts the node by starting all threads.
The node gets started by starting the message thread and the timeout thread.
"""
self.logger.debug("Start node in start")
self.message_thread.start()
self.timeout_thread.start()
def send_state_to_monitor(self, state):
"""Sends the given state to the StateMonitor.
The method sends the given state to the address defined in the ModuleConfig.
Args:
state (Object): Defines the state that should be sent. This should be a object of a state class.
"""
socket = self._context.socket(zmq.REQ)
end_point = "tcp://" + ModuleConfig.STATE_MONITOR_ADDRESS
socket.connect(end_point)
socket.send_pyobj(state)
reply = socket.recv_pyobj()
socket.setsockopt(zmq.LINGER, 0)
socket.close()
return reply
def shutdown(self):
"""Completely shutdown the node.
The method stops all threads by setting the running state to False.
After that it stops the RelayLayer and waits until the RelayLayer deleted all Relays.
"""
# self.logger.warning("Stopping")
self.running = False
# print(time.time() - self.start_time)
if self.analyse_mode:
self.send_analyse_state()
self.relay_layer.stop()
self.relay_layer.timeout_thread.join()
self.timeout_thread.join()
self.buffer.close()
self.buffer.join_thread()
# self.relay_layer.shutdown()
def stop(self):
"""Stopping the node so that the node should leave the system.
The method sets the leaving state to True and activates the leaving protocol of the node.
"""
self.logger.debug("Leaving the system")
self.leaving = True
Classes
class Node (node_id: int, ip: str, port: int, analyse_mode=False)-
The Node class represents a Node in a distributed System.
The Node class is the main class for creating distributed protocols with the relay model. This class implements functionality to work with the relay model and leave a system without harming its connectivity. It implements the protocol provided by Setzer in his phd thesis: (https://digital.ub.uni-paderborn.de/urn/urn:nbn:de:hbz:466:2-37849).
Attributes
buffer:multiprocessing.Queue- Defines a buffer for the node where actions can be inserted by the RelayLayer. Every action in this buffer gets processed by the node.
relay_layer:RelayLayer- Stores a RelayLayer object for this node.
running:bool- Stores the state of the node. If True the node is running if False the node is not running and all threads are stopped.
timeout_thread:threading.Thread- Stores the timeout thread where timeout action were inserted in the message buffer.
message_thread:threading.Thread- Stores the message thread of the node. The thread periodically checks if there is a new Action in the node_buffer and executes the Action.
logger:logging.Logger- Stores the Logger class of the Node.
timeout_period:float- Stores the period of the timeout execution. By default this is set to the value set in the ModuleConfig.
leaving:bool- Stores the leaving state of the node. When set to True the Node changes its behaviour where it tries to leave the system by the protocol provided by Setzer.
in_ref:RelayId- Stores a relay id to the main sink relay of this node.
N:list- Stores a list which is a pseudo variable for all relays stored in an underlying protocol.
D:list- Stores a list of RelayIds which should be deleted in the protocol.
a_out:RelayId- Stores the RelayId of the outgoing ancher.
a_in:RelayId- Stores the RelayId of the incoming ancher.
analyse_mode:bool- Handles the analyse mode of the node. When set to True the Node is executed in analyse mode and sends every timeout a state to a StateMonitor. By default this is set to False and can be activated either in the constructor or by changing the attribute.
The constructor creates a Node object with given node id, ip and port.
When creating a Node object the constructor initializes a RelayLayer and creates all necessary threads. All threads are not started until the
startmethod is called.Args
node_id:int- Defines the node id of a node to form a topology.
ip:str- Defines the ip address where the node should be reachable on.
port:int- Defines the port where the node should be reachable on.
analyse_mode:bool, optional- Optionally the analyse mode can be turned on while creating the node. By default this parameter is set to False.
Expand source code
class Node: """The Node class represents a Node in a distributed System. The Node class is the main class for creating distributed protocols with the relay model. This class implements functionality to work with the relay model and leave a system without harming its connectivity. It implements the protocol provided by Setzer in his phd thesis: (https://digital.ub.uni-paderborn.de/urn/urn:nbn:de:hbz:466:2-37849). Attributes: buffer (multiprocessing.Queue): Defines a buffer for the node where actions can be inserted by the RelayLayer. Every action in this buffer gets processed by the node. relay_layer (RelayLayer): Stores a RelayLayer object for this node. running (bool): Stores the state of the node. If True the node is running if False the node is not running and all threads are stopped. timeout_thread (threading.Thread): Stores the timeout thread where timeout action were inserted in the message buffer. message_thread (threading.Thread): Stores the message thread of the node. The thread periodically checks if there is a new Action in the node_buffer and executes the Action. logger (logging.Logger): Stores the Logger class of the Node. timeout_period (float): Stores the period of the timeout execution. By default this is set to the value set in the ModuleConfig. leaving (bool): Stores the leaving state of the node. When set to True the Node changes its behaviour where it tries to leave the system by the protocol provided by Setzer. in_ref (RelayId): Stores a relay id to the main sink relay of this node. N (list): Stores a list which is a pseudo variable for all relays stored in an underlying protocol. D (list): Stores a list of RelayIds which should be deleted in the protocol. a_out (RelayId): Stores the RelayId of the outgoing ancher. a_in (RelayId): Stores the RelayId of the incoming ancher. analyse_mode (bool): Handles the analyse mode of the node. When set to True the Node is executed in analyse mode and sends every timeout a state to a StateMonitor. By default this is set to False and can be activated either in the constructor or by changing the attribute. """ def __init__(self, node_id: int, ip: str, port: int, analyse_mode=False): """The constructor creates a Node object with given node id, ip and port. When creating a Node object the constructor initializes a RelayLayer and creates all necessary threads. All threads are not started until the `start` method is called. Args: node_id (int): Defines the node id of a node to form a topology. ip (str): Defines the ip address where the node should be reachable on. port (int): Defines the port where the node should be reachable on. analyse_mode (bool, optional): Optionally the analyse mode can be turned on while creating the node. By default this parameter is set to False. """ self.buffer = multiprocessing.Queue() layer_id = RelayLayerId(ip, port) layer_id.node_id = node_id self.relay_layer = RelayLayer(layer_id, self.buffer) self.running = True self.timeout_thread = threading.Thread(target=self._start_timeout_thread, daemon=True) self.message_thread = threading.Thread(target=self._start_message_handling, daemon=True) self.logger = RelayLogging.get_logger(ModuleConfig.RELAY_LOG_LEVEL, "node_{}_{}".format(ip, port)) self.timeout_period = ModuleConfig.NODE_TIMEOUT_PERIOD self.leaving = False self.in_ref = None self.N = [] self.D = [] self.a_out = None self.a_in = None self.analyse_mode = analyse_mode self._context = zmq.Context() self.logger.debug("Start node") def _start_message_handling(self): while self.running: try: message = self.buffer.get(False, 0.1) self.logger.debug("buffer size: {}".format(self.buffer.qsize())) if isinstance(message, Action): self.replace_action(message) else: self.logger.error("No supported class found for message {}".format(message)) except queue.Empty: pass self.logger.debug("Stopped Node Thread") def _start_timeout_thread(self): while self.running: action = Action('timeout', []) self.buffer.put(action) # self.timeout() time.sleep(self.timeout_period) self.logger.debug("Stopped Timeout Thread") def timeout(self): """The Timeout method is called periodically and corrects all values in the node. The method is correcting every value in the node or if the node is leaving it will prepare the node for leaving. For further information about the protocol see the thesis of this paper. """ self.logger.debug("in_ref " + str(self.in_ref)) self.logger.debug('a_in ' + str(self.a_in)) self.logger.debug('a_out ' + str(self.a_out)) for relay in self.get_relays_from_original_variables(): if not self.relay_layer.check_relay_exists(relay): self.remove_from_original_variables(relay) if self.in_ref is not None and not self.relay_layer.check_relay_exists(self.in_ref): self.in_ref = None if self.a_out is not None and not self.relay_layer.check_relay_exists(self.a_out): self.a_out = None if self.a_in is not None and not self.relay_layer.check_relay_exists(self.a_in): self.a_in = None validated_relays = self.relay_layer.get_validated_relays() for relay in validated_relays: if relay not in self.D and not self.check_in_original_variables(relay) and relay != self.in_ref \ and relay != self.a_in and relay != self.a_out: self.D.append(relay) if self.in_ref is None or not self.relay_layer.is_sink(self.in_ref): if self.in_ref is not None: self.D.append(self.in_ref) self.in_ref = self.relay_layer.new_relay() if not self.leaving: for relay in self.get_relays_from_original_variables(): # if not self.call_relay_layer_method("check_direct", True, (relay,)): if not self.relay_layer.is_direct(relay): self.remove_from_original_variables(relay) self.D.append(relay) if self.a_in is not None: self.D.append(self.a_in) self.a_in = None if self.a_out is not None: self.D.append(self.a_out) self.a_out = None for relay in self.D.copy(): if not self.relay_layer.has_incoming(relay): self.D.remove(relay) self.reversal_of_relay(relay) self.relay_layer.delete(relay) time_start = time.time() self.original_timeout() self.logger.debug("Orig timeout time: {}".format(time.time() - time_start)) else: for relay in self.get_relays_from_original_variables(): self.remove_from_original_variables(relay) self.D.append(relay) if self.a_in is not None and not self.relay_layer.is_sink(self.a_in): self.D.append(self.a_in) self.a_in = self.relay_layer.new_relay() if self.a_in is None: self.a_in = self.relay_layer.new_relay() if self.a_out is not None: if self.relay_layer.has_incoming(self.a_out) or not self.relay_layer.is_direct(self.a_out)\ or self.relay_layer.is_sink(self.a_out): self.D.append(self.a_out) self.a_out = None for relay in self.D.copy(): if not self.relay_layer.has_incoming(relay) and relay != self.a_out: action = Action('ask_to_reverse', [self.in_ref]) self.relay_layer.send(relay, action) self.D.remove(relay) self.logger.debug("Delete relay: {}".format(relay)) self.relay_layer.delete(relay) if len(self.D) == 0 and not self.relay_layer.has_incoming(self.in_ref) \ and not self.relay_layer.has_incoming(self.a_in): self.shutdown() return self.logger.debug("LEAVING CHECK: {} {} {}".format(self.node_id, len(self.D), self.relay_layer.has_incoming(self.in_ref), self.relay_layer.has_incoming(self.a_in))) if self.a_out is not None: if not self.relay_layer.has_incoming(self.a_in): action = Action("ask_to_reverse_anchor", [self.a_in]) self.relay_layer.send(self.a_out, action) action = Action("notify_anchor", []) self.relay_layer.send(self.a_out, action) if self.analyse_mode: # Send local state to monitor done = self.send_analyse_state() if done: self.shutdown() def original_timeout(self): """Can be overridden to implement a underlying protocol timeout. This method is automatically executed on the end of every timeout of this node class. It should be implemented with the timeout of the underlying protocol. Example: See `SortedListNode` """ pass def reversal_of_relay(self, relay_id: RelayId): """Should be overridden to reverse a specific relay reference. The method should just send a node action from the underlying protocol over the given relay with the in_ref as parameter. Example: See `SortedListNode`. Example for the linearize Action self.call_method(relay_id, 'linearize', [self.in_ref]) """ pass def check_in_original_variables(self, relay_id: RelayId): """Should be overridden to check if a specific relay is in one of the underlying protocol variables. By default it checks if the relay is in the attribute N. This should be changed to a better implementation when implementing a underlying protocol. Example: See `SortedListNode` implementation. Args: relay_id (RelayId): Defines the RelayId that should be checked. """ return relay_id in self.N def remove_from_original_variables(self, relay_id: RelayId): """Should be overridden to remove relays from underlying protocol variables. By default it removes the given relay id from the N attribute. This should be changed to a better implementation when implementing a underlying protocol. Example: See `SortedListNode` implementation. Args: relay_id (RelayId): Defines the RelayId that should be removed from the variables. """ self.N.remove(relay_id) def get_relays_from_original_variables(self): """Should be overridden to get all relays from underlying protocol variables. This normally just returns the N attribute list. This should be changed to a better implementation when implementing a underlying protocol. It should return a list of RelayIds that are stored in any variable define in the protocol. Example: See `SortedListNode` implementation. Returns: list: A list of RelayIds containing in variables of the protocol. """ return self.N def send_analyse_state(self): """Should be overridden to send a state to a StateMonitor. In this method a State object should be created. After that it should be sent with the `send_state_to_monitor` method. Example: See `SortedListNode` implementation. """ pass def replace_action(self, action): """This method is called whenever the node received a Action. The method executes the method defined in the received action if the node is staying or reverses the connection if the node is leaving. Args: action (Action): The received action that should be executed. """ # self.logger.debug(f"Calling Replace_action with {action}") action_name = action.action_type.lower() if not self.leaving or action_name == "timeout" or action_name == "reverse" or action_name == "ask_to_reverse" \ or action_name == "ask_to_reverse_anchor" or action_name == "notify_anchor": action_parameters = action.parameters if action_name == "ask_to_reverse_anchor": action_parameters.append(action.receiving_relay) try: action_function = getattr(self, action_name) except AttributeError: self.logger.warning("Action with action type {} not found!".format(action_name)) else: if callable(action_function): action_function(*action_parameters) else: self.logger.warning("Action with action type {} is not callable or a function!".format(action_name)) else: action_parameters = action.parameters receiving_relay = action.receiving_relay if receiving_relay is None: self.logger.warning("Receiving relay was empty for action {}".format(action)) else: for parameter in action_parameters: if isinstance(parameter, RelayId): action = Action('ask_to_reverse', [receiving_relay]) self.relay_layer.send(parameter, action) self.relay_layer.delete(parameter) def ask_to_reverse(self, out: RelayId): """Implements the ask_to_reverse protocol method. For further information see the phd thesis. Args: out (RelayId): Defines the RelayId that should be reversed. """ self.logger.debug("Calling ask_to_reverse with {}".format(out)) if not self.leaving: for v in self.get_relays_from_original_variables(): if self.relay_layer.same_target(out, v): self.remove_from_original_variables(v) self.D.append(v) action = Action("reverse", [self.in_ref]) # self.call_relay_layer_method("send", True, [out, action]) # self.call_relay_layer_method("delete", False, [out]) self.relay_layer.send(out, action) self.relay_layer.delete(out) else: if self.a_out is None: action = Action('ask_to_reverse', [self.in_ref]) # self.call_relay_layer_method("send", True, [out, action]) # self.call_relay_layer_method("delete", False, [out]) self.relay_layer.send(out, action) self.relay_layer.delete(out) else: if self.relay_layer.same_target(out, self.a_out): # new_relay = self.call_relay_layer_method("merge", True, [[out, self.a_out]]) new_relay = self.relay_layer.merge([self.a_out, out]) if new_relay: self.a_out = None action = Action('ask_to_reverse', [self.in_ref]) # self.call_relay_layer_method("send", True, [new_relay, action]) self.relay_layer.send(new_relay, action) # else: # self.logger.warning("While merge {} \nand {}\n something went wrong".format(out, self.a_out)) else: action = Action('reverse', [self.a_out]) # self.call_relay_layer_method("send", True, [out, action]) # self.call_relay_layer_method("delete", False, [out]) self.relay_layer.send(out, action) self.relay_layer.delete(out) def ask_to_reverse_anchor(self, out: RelayId, receiving_relay: RelayId): """Implements the ask_to_reverse_anchor protocol method. For further information see the phd thesis. Args: out (RelayId): Defines a Relay. receiving_relay (RelayId): Defines the relay that received this action. """ self.logger.debug("Calling ask_to_reverse_anchor with {}\n {}".format(out, receiving_relay)) if not self.leaving: action = Action('reverse', [self.in_ref]) # self.call_relay_layer_method("send", True, [out, action]) # self.call_relay_layer_method("delete", False, [out]) self.relay_layer.send(out, action) self.relay_layer.delete(out) else: if receiving_relay is None: self.logger.warning("Receiving relay was empty for action ask to reverse anchor") else: action = Action('ask_to_reverse', [receiving_relay]) # self.call_relay_layer_method("send", True, [out, action]) self.relay_layer.send(out, action) self.relay_layer.delete(out) def notify_anchor(self): """Implements the notify_anchor protocol method. For further information see the phd thesis. """ self.logger.debug("Calling notify anchor") if self.leaving: if self.a_in is not None: self.D.append(self.a_in) # self.a_in = self.call_relay_layer_method("new_relay", True, [True]) self.a_in = self.relay_layer.new_relay() def reverse(self, out: RelayId): """Implements the reverse protocol method. For further information see the phd thesis. Args: out (RelayId): Defines a Relay. """ self.logger.debug("Calling reverse with {}".format(out)) if not self.leaving: self.reversal_of_relay(out) # self.call_relay_layer_method("delete", False, [out]) self.relay_layer.delete(out) else: if self.a_out is None: if self.relay_layer.is_dead(out): print("Reverse relay is not alive {}".format(out)) if self.relay_layer.is_direct(out): self.a_out = out # print(f"OUT: {self.a_out}") else: # Ask out to send a direct relay action = Action('ask_to_reverse', [self.in_ref]) # self.call_relay_layer_method("send", True, [out, action]) # self.call_relay_layer_method("delete", False, [out]) self.relay_layer.send(out, action) self.relay_layer.delete(out) else: if self.relay_layer.same_target(out, self.a_out): start_time = time.time() # merged = self.call_relay_layer_method("merge", True, [[a_out, out]]) merged = self.relay_layer.merge([self.a_out, out]) if merged: self.a_out = merged else: # print("not same target") action = Action('ask_to_reverse', [self.a_out]) # self.call_relay_layer_method("send", True, [out, action]) # self.call_relay_layer_method("delete", False, [out]) self.relay_layer.send(out, action) self.relay_layer.delete(out) def call_method(self, relay_id: RelayId, method: str, parameters: list): """Calls a method of another node connected with the given relay. This method calls the given method with the given parameters in the node that handles the sink relay that is connected with the relay defined by the given relay id. It calls the send method from the RelayLayer to send the method to the node. Args: relay_id (RelayId): Defines the Relay which is used to send the action. method (str): Defines the name of the method that should be executed in the other node. parameters (list): Defines a list of parameters used to execute the method. """ action = Action(method, parameters) # self.call_relay_layer_method("send", True, [relay, action]) result = self.relay_layer.send(relay_id, action) if not result: self.logger.error("SEND CALL FAILED FOR RELAY {}".format(relay_id)) def start(self): """Starts the node by starting all threads. The node gets started by starting the message thread and the timeout thread. """ self.logger.debug("Start node in start") self.message_thread.start() self.timeout_thread.start() def send_state_to_monitor(self, state): """Sends the given state to the StateMonitor. The method sends the given state to the address defined in the ModuleConfig. Args: state (Object): Defines the state that should be sent. This should be a object of a state class. """ socket = self._context.socket(zmq.REQ) end_point = "tcp://" + ModuleConfig.STATE_MONITOR_ADDRESS socket.connect(end_point) socket.send_pyobj(state) reply = socket.recv_pyobj() socket.setsockopt(zmq.LINGER, 0) socket.close() return reply def shutdown(self): """Completely shutdown the node. The method stops all threads by setting the running state to False. After that it stops the RelayLayer and waits until the RelayLayer deleted all Relays. """ # self.logger.warning("Stopping") self.running = False # print(time.time() - self.start_time) if self.analyse_mode: self.send_analyse_state() self.relay_layer.stop() self.relay_layer.timeout_thread.join() self.timeout_thread.join() self.buffer.close() self.buffer.join_thread() # self.relay_layer.shutdown() def stop(self): """Stopping the node so that the node should leave the system. The method sets the leaving state to True and activates the leaving protocol of the node. """ self.logger.debug("Leaving the system") self.leaving = TrueSubclasses
Methods
def ask_to_reverse(self, out: RelayId)-
Implements the ask_to_reverse protocol method.
For further information see the phd thesis.
Args
out:RelayId- Defines the RelayId that should be reversed.
Expand source code
def ask_to_reverse(self, out: RelayId): """Implements the ask_to_reverse protocol method. For further information see the phd thesis. Args: out (RelayId): Defines the RelayId that should be reversed. """ self.logger.debug("Calling ask_to_reverse with {}".format(out)) if not self.leaving: for v in self.get_relays_from_original_variables(): if self.relay_layer.same_target(out, v): self.remove_from_original_variables(v) self.D.append(v) action = Action("reverse", [self.in_ref]) # self.call_relay_layer_method("send", True, [out, action]) # self.call_relay_layer_method("delete", False, [out]) self.relay_layer.send(out, action) self.relay_layer.delete(out) else: if self.a_out is None: action = Action('ask_to_reverse', [self.in_ref]) # self.call_relay_layer_method("send", True, [out, action]) # self.call_relay_layer_method("delete", False, [out]) self.relay_layer.send(out, action) self.relay_layer.delete(out) else: if self.relay_layer.same_target(out, self.a_out): # new_relay = self.call_relay_layer_method("merge", True, [[out, self.a_out]]) new_relay = self.relay_layer.merge([self.a_out, out]) if new_relay: self.a_out = None action = Action('ask_to_reverse', [self.in_ref]) # self.call_relay_layer_method("send", True, [new_relay, action]) self.relay_layer.send(new_relay, action) # else: # self.logger.warning("While merge {} \nand {}\n something went wrong".format(out, self.a_out)) else: action = Action('reverse', [self.a_out]) # self.call_relay_layer_method("send", True, [out, action]) # self.call_relay_layer_method("delete", False, [out]) self.relay_layer.send(out, action) self.relay_layer.delete(out) def ask_to_reverse_anchor(self, out: RelayId, receiving_relay: RelayId)-
Implements the ask_to_reverse_anchor protocol method.
For further information see the phd thesis.
Args
out:RelayId- Defines a Relay.
receiving_relay:RelayId- Defines the relay that received this action.
Expand source code
def ask_to_reverse_anchor(self, out: RelayId, receiving_relay: RelayId): """Implements the ask_to_reverse_anchor protocol method. For further information see the phd thesis. Args: out (RelayId): Defines a Relay. receiving_relay (RelayId): Defines the relay that received this action. """ self.logger.debug("Calling ask_to_reverse_anchor with {}\n {}".format(out, receiving_relay)) if not self.leaving: action = Action('reverse', [self.in_ref]) # self.call_relay_layer_method("send", True, [out, action]) # self.call_relay_layer_method("delete", False, [out]) self.relay_layer.send(out, action) self.relay_layer.delete(out) else: if receiving_relay is None: self.logger.warning("Receiving relay was empty for action ask to reverse anchor") else: action = Action('ask_to_reverse', [receiving_relay]) # self.call_relay_layer_method("send", True, [out, action]) self.relay_layer.send(out, action) self.relay_layer.delete(out) def call_method(self, relay_id: RelayId, method: str, parameters: list)-
Calls a method of another node connected with the given relay.
This method calls the given method with the given parameters in the node that handles the sink relay that is connected with the relay defined by the given relay id. It calls the send method from the RelayLayer to send the method to the node.
Args
relay_id:RelayId- Defines the Relay which is used to send the action.
method:str- Defines the name of the method that should be executed in the other node.
parameters:list- Defines a list of parameters used to execute the method.
Expand source code
def call_method(self, relay_id: RelayId, method: str, parameters: list): """Calls a method of another node connected with the given relay. This method calls the given method with the given parameters in the node that handles the sink relay that is connected with the relay defined by the given relay id. It calls the send method from the RelayLayer to send the method to the node. Args: relay_id (RelayId): Defines the Relay which is used to send the action. method (str): Defines the name of the method that should be executed in the other node. parameters (list): Defines a list of parameters used to execute the method. """ action = Action(method, parameters) # self.call_relay_layer_method("send", True, [relay, action]) result = self.relay_layer.send(relay_id, action) if not result: self.logger.error("SEND CALL FAILED FOR RELAY {}".format(relay_id)) def check_in_original_variables(self, relay_id: RelayId)-
Should be overridden to check if a specific relay is in one of the underlying protocol variables.
By default it checks if the relay is in the attribute N. This should be changed to a better implementation when implementing a underlying protocol.
Example
See
SortedListNodeimplementation.Args
relay_id:RelayId- Defines the RelayId that should be checked.
Expand source code
def check_in_original_variables(self, relay_id: RelayId): """Should be overridden to check if a specific relay is in one of the underlying protocol variables. By default it checks if the relay is in the attribute N. This should be changed to a better implementation when implementing a underlying protocol. Example: See `SortedListNode` implementation. Args: relay_id (RelayId): Defines the RelayId that should be checked. """ return relay_id in self.N def get_relays_from_original_variables(self)-
Should be overridden to get all relays from underlying protocol variables.
This normally just returns the N attribute list. This should be changed to a better implementation when implementing a underlying protocol. It should return a list of RelayIds that are stored in any variable define in the protocol.
Example
See
SortedListNodeimplementation.Returns
list- A list of RelayIds containing in variables of the protocol.
Expand source code
def get_relays_from_original_variables(self): """Should be overridden to get all relays from underlying protocol variables. This normally just returns the N attribute list. This should be changed to a better implementation when implementing a underlying protocol. It should return a list of RelayIds that are stored in any variable define in the protocol. Example: See `SortedListNode` implementation. Returns: list: A list of RelayIds containing in variables of the protocol. """ return self.N def notify_anchor(self)-
Implements the notify_anchor protocol method.
For further information see the phd thesis.
Expand source code
def notify_anchor(self): """Implements the notify_anchor protocol method. For further information see the phd thesis. """ self.logger.debug("Calling notify anchor") if self.leaving: if self.a_in is not None: self.D.append(self.a_in) # self.a_in = self.call_relay_layer_method("new_relay", True, [True]) self.a_in = self.relay_layer.new_relay() def original_timeout(self)-
Can be overridden to implement a underlying protocol timeout.
This method is automatically executed on the end of every timeout of this node class. It should be implemented with the timeout of the underlying protocol.
Example
See
SortedListNodeExpand source code
def original_timeout(self): """Can be overridden to implement a underlying protocol timeout. This method is automatically executed on the end of every timeout of this node class. It should be implemented with the timeout of the underlying protocol. Example: See `SortedListNode` """ pass def remove_from_original_variables(self, relay_id: RelayId)-
Should be overridden to remove relays from underlying protocol variables.
By default it removes the given relay id from the N attribute. This should be changed to a better implementation when implementing a underlying protocol.
Example
See
SortedListNodeimplementation.Args
relay_id:RelayId- Defines the RelayId that should be removed from the variables.
Expand source code
def remove_from_original_variables(self, relay_id: RelayId): """Should be overridden to remove relays from underlying protocol variables. By default it removes the given relay id from the N attribute. This should be changed to a better implementation when implementing a underlying protocol. Example: See `SortedListNode` implementation. Args: relay_id (RelayId): Defines the RelayId that should be removed from the variables. """ self.N.remove(relay_id) def replace_action(self, action)-
This method is called whenever the node received a Action.
The method executes the method defined in the received action if the node is staying or reverses the connection if the node is leaving.
Args
action:Action- The received action that should be executed.
Expand source code
def replace_action(self, action): """This method is called whenever the node received a Action. The method executes the method defined in the received action if the node is staying or reverses the connection if the node is leaving. Args: action (Action): The received action that should be executed. """ # self.logger.debug(f"Calling Replace_action with {action}") action_name = action.action_type.lower() if not self.leaving or action_name == "timeout" or action_name == "reverse" or action_name == "ask_to_reverse" \ or action_name == "ask_to_reverse_anchor" or action_name == "notify_anchor": action_parameters = action.parameters if action_name == "ask_to_reverse_anchor": action_parameters.append(action.receiving_relay) try: action_function = getattr(self, action_name) except AttributeError: self.logger.warning("Action with action type {} not found!".format(action_name)) else: if callable(action_function): action_function(*action_parameters) else: self.logger.warning("Action with action type {} is not callable or a function!".format(action_name)) else: action_parameters = action.parameters receiving_relay = action.receiving_relay if receiving_relay is None: self.logger.warning("Receiving relay was empty for action {}".format(action)) else: for parameter in action_parameters: if isinstance(parameter, RelayId): action = Action('ask_to_reverse', [receiving_relay]) self.relay_layer.send(parameter, action) self.relay_layer.delete(parameter) def reversal_of_relay(self, relay_id: RelayId)-
Should be overridden to reverse a specific relay reference.
The method should just send a node action from the underlying protocol over the given relay with the in_ref as parameter.
Example
See
SortedListNode. Example for the linearize Actionself.call_method(relay_id, 'linearize', [self.in_ref])Expand source code
def reversal_of_relay(self, relay_id: RelayId): """Should be overridden to reverse a specific relay reference. The method should just send a node action from the underlying protocol over the given relay with the in_ref as parameter. Example: See `SortedListNode`. Example for the linearize Action self.call_method(relay_id, 'linearize', [self.in_ref]) """ pass def reverse(self, out: RelayId)-
Implements the reverse protocol method.
For further information see the phd thesis.
Args
out:RelayId- Defines a Relay.
Expand source code
def reverse(self, out: RelayId): """Implements the reverse protocol method. For further information see the phd thesis. Args: out (RelayId): Defines a Relay. """ self.logger.debug("Calling reverse with {}".format(out)) if not self.leaving: self.reversal_of_relay(out) # self.call_relay_layer_method("delete", False, [out]) self.relay_layer.delete(out) else: if self.a_out is None: if self.relay_layer.is_dead(out): print("Reverse relay is not alive {}".format(out)) if self.relay_layer.is_direct(out): self.a_out = out # print(f"OUT: {self.a_out}") else: # Ask out to send a direct relay action = Action('ask_to_reverse', [self.in_ref]) # self.call_relay_layer_method("send", True, [out, action]) # self.call_relay_layer_method("delete", False, [out]) self.relay_layer.send(out, action) self.relay_layer.delete(out) else: if self.relay_layer.same_target(out, self.a_out): start_time = time.time() # merged = self.call_relay_layer_method("merge", True, [[a_out, out]]) merged = self.relay_layer.merge([self.a_out, out]) if merged: self.a_out = merged else: # print("not same target") action = Action('ask_to_reverse', [self.a_out]) # self.call_relay_layer_method("send", True, [out, action]) # self.call_relay_layer_method("delete", False, [out]) self.relay_layer.send(out, action) self.relay_layer.delete(out) def send_analyse_state(self)-
Should be overridden to send a state to a StateMonitor.
In this method a State object should be created. After that it should be sent with the
send_state_to_monitormethod.Example
See
SortedListNodeimplementation.Expand source code
def send_analyse_state(self): """Should be overridden to send a state to a StateMonitor. In this method a State object should be created. After that it should be sent with the `send_state_to_monitor` method. Example: See `SortedListNode` implementation. """ pass def send_state_to_monitor(self, state)-
Sends the given state to the StateMonitor.
The method sends the given state to the address defined in the ModuleConfig.
Args
state:Object- Defines the state that should be sent. This should be a object of a state class.
Expand source code
def send_state_to_monitor(self, state): """Sends the given state to the StateMonitor. The method sends the given state to the address defined in the ModuleConfig. Args: state (Object): Defines the state that should be sent. This should be a object of a state class. """ socket = self._context.socket(zmq.REQ) end_point = "tcp://" + ModuleConfig.STATE_MONITOR_ADDRESS socket.connect(end_point) socket.send_pyobj(state) reply = socket.recv_pyobj() socket.setsockopt(zmq.LINGER, 0) socket.close() return reply def shutdown(self)-
Completely shutdown the node.
The method stops all threads by setting the running state to False. After that it stops the RelayLayer and waits until the RelayLayer deleted all Relays.
Expand source code
def shutdown(self): """Completely shutdown the node. The method stops all threads by setting the running state to False. After that it stops the RelayLayer and waits until the RelayLayer deleted all Relays. """ # self.logger.warning("Stopping") self.running = False # print(time.time() - self.start_time) if self.analyse_mode: self.send_analyse_state() self.relay_layer.stop() self.relay_layer.timeout_thread.join() self.timeout_thread.join() self.buffer.close() self.buffer.join_thread() def start(self)-
Starts the node by starting all threads.
The node gets started by starting the message thread and the timeout thread.
Expand source code
def start(self): """Starts the node by starting all threads. The node gets started by starting the message thread and the timeout thread. """ self.logger.debug("Start node in start") self.message_thread.start() self.timeout_thread.start() def stop(self)-
Stopping the node so that the node should leave the system.
The method sets the leaving state to True and activates the leaving protocol of the node.
Expand source code
def stop(self): """Stopping the node so that the node should leave the system. The method sets the leaving state to True and activates the leaving protocol of the node. """ self.logger.debug("Leaving the system") self.leaving = True def timeout(self)-
The Timeout method is called periodically and corrects all values in the node.
The method is correcting every value in the node or if the node is leaving it will prepare the node for leaving. For further information about the protocol see the thesis of this paper.
Expand source code
def timeout(self): """The Timeout method is called periodically and corrects all values in the node. The method is correcting every value in the node or if the node is leaving it will prepare the node for leaving. For further information about the protocol see the thesis of this paper. """ self.logger.debug("in_ref " + str(self.in_ref)) self.logger.debug('a_in ' + str(self.a_in)) self.logger.debug('a_out ' + str(self.a_out)) for relay in self.get_relays_from_original_variables(): if not self.relay_layer.check_relay_exists(relay): self.remove_from_original_variables(relay) if self.in_ref is not None and not self.relay_layer.check_relay_exists(self.in_ref): self.in_ref = None if self.a_out is not None and not self.relay_layer.check_relay_exists(self.a_out): self.a_out = None if self.a_in is not None and not self.relay_layer.check_relay_exists(self.a_in): self.a_in = None validated_relays = self.relay_layer.get_validated_relays() for relay in validated_relays: if relay not in self.D and not self.check_in_original_variables(relay) and relay != self.in_ref \ and relay != self.a_in and relay != self.a_out: self.D.append(relay) if self.in_ref is None or not self.relay_layer.is_sink(self.in_ref): if self.in_ref is not None: self.D.append(self.in_ref) self.in_ref = self.relay_layer.new_relay() if not self.leaving: for relay in self.get_relays_from_original_variables(): # if not self.call_relay_layer_method("check_direct", True, (relay,)): if not self.relay_layer.is_direct(relay): self.remove_from_original_variables(relay) self.D.append(relay) if self.a_in is not None: self.D.append(self.a_in) self.a_in = None if self.a_out is not None: self.D.append(self.a_out) self.a_out = None for relay in self.D.copy(): if not self.relay_layer.has_incoming(relay): self.D.remove(relay) self.reversal_of_relay(relay) self.relay_layer.delete(relay) time_start = time.time() self.original_timeout() self.logger.debug("Orig timeout time: {}".format(time.time() - time_start)) else: for relay in self.get_relays_from_original_variables(): self.remove_from_original_variables(relay) self.D.append(relay) if self.a_in is not None and not self.relay_layer.is_sink(self.a_in): self.D.append(self.a_in) self.a_in = self.relay_layer.new_relay() if self.a_in is None: self.a_in = self.relay_layer.new_relay() if self.a_out is not None: if self.relay_layer.has_incoming(self.a_out) or not self.relay_layer.is_direct(self.a_out)\ or self.relay_layer.is_sink(self.a_out): self.D.append(self.a_out) self.a_out = None for relay in self.D.copy(): if not self.relay_layer.has_incoming(relay) and relay != self.a_out: action = Action('ask_to_reverse', [self.in_ref]) self.relay_layer.send(relay, action) self.D.remove(relay) self.logger.debug("Delete relay: {}".format(relay)) self.relay_layer.delete(relay) if len(self.D) == 0 and not self.relay_layer.has_incoming(self.in_ref) \ and not self.relay_layer.has_incoming(self.a_in): self.shutdown() return self.logger.debug("LEAVING CHECK: {} {} {}".format(self.node_id, len(self.D), self.relay_layer.has_incoming(self.in_ref), self.relay_layer.has_incoming(self.a_in))) if self.a_out is not None: if not self.relay_layer.has_incoming(self.a_in): action = Action("ask_to_reverse_anchor", [self.a_in]) self.relay_layer.send(self.a_out, action) action = Action("notify_anchor", []) self.relay_layer.send(self.a_out, action) if self.analyse_mode: # Send local state to monitor done = self.send_analyse_state() if done: self.shutdown()