Commit c6a106ad authored by thorsten's avatar thorsten
Browse files

* Make sure that expired is only send once

parent 1ef582c0
......@@ -319,7 +319,7 @@ s_relay_protocol_send_event(relay_protocol_t *self,
va_end(args);
assert(rc==0);
if(self->verbose)
if(self->verbose)
zsys_debug("(Relay %s) Successfully Send event %d to application", self->name != NULL ? self->name : zuuid_str(self->uuid), event);
}else{
handled = -1;
......@@ -700,12 +700,11 @@ s_check_target(relay_protocol_t *self) {
assert(self->frontend);
relay_peer_t *target_as_peer = relay_target_peer(self->frontend->target);
if (!target_as_peer) return;
if (!target_as_peer || relay_peer_connection_state(target_as_peer) == RELAY_PEER_NONE) return;
time_t current_time = zclock_mono();
switch (relay_peer_expiration_state(target_as_peer)) {
case RELAY_PEER_NORMAL:
if (current_time >= relay_peer_evasive_at(target_as_peer))
relay_target_ping(self->frontend->target);
......@@ -713,11 +712,11 @@ s_check_target(relay_protocol_t *self) {
case RELAY_PEER_EVASIVE:
if (current_time >= relay_peer_expired_at(target_as_peer)) {
relay_target_timeout(self->frontend->target);
s_relay_protocol_send_event(self, RELAY_EVENT_EXPIRED, relay_peer_uuid(target_as_peer), NULL);
rmq_msg_t *expired = self->reply;
rmq_msg_set_id(expired, RMQ_MSG_EXPIRED);
relay_protocol_reply_all(self, &expired);
assert(relay_peer_expiration_state(target_as_peer) == RELAY_PEER_EXPIRED);
}
break;
case RELAY_PEER_EXPIRED:
......@@ -1003,6 +1002,7 @@ relay_protocol_handle_dealer(relay_protocol_t *self) {
}
else
if (rmq_msg_id (msg) == RMQ_MSG_EXPIRED) {
s_relay_protocol_send_event(self, RELAY_EVENT_EXPIRED, relay_peer_uuid(target_peer), NULL);
relay_target_probe_timeout(self->frontend->target);
relay_protocol_reply_all(self, &msg);
......@@ -1221,6 +1221,7 @@ s_retire_peer(relay_protocol_t *self, relay_peer_t *peer, zuuid_t *peer_uuid, bo
}else{
relay_handle_t *handle = relay_peer_get_handle(peer, peer_uuid);
relay_handle_set_status(handle, expired ? RELAY_HANDLE_EXPIRED : RELAY_HANDLE_NONE);
s_relay_protocol_send_event(self, RELAY_EVENT_EXPIRED, peer_uuid, NULL);
}
return 0;
......@@ -1238,24 +1239,23 @@ s_handle_peer_timeout(relay_peer_t *peer, void *arg)
if (relay_peer_expiration_state(peer) == RELAY_PEER_NORMAL && current_time >= relay_peer_evasive_at(peer))
{
relay_peer_ping(peer);
return true;
}
else
if (relay_peer_expiration_state(peer) == RELAY_PEER_EVASIVE && current_time >= relay_peer_expired_at(peer))
{
relay_peer_timeout(peer);
return true;
}
else
if (relay_peer_expiration_state(peer) == RELAY_PEER_EXPIRED){
relay_handle_t* handle = relay_peer_first_handle(peer);
zuuid_t* uuid;
while(handle){
uuid = relay_handle_uuid(handle);
s_relay_protocol_send_event(self, RELAY_EVENT_EXPIRED, uuid, NULL);
handle = relay_peer_next_handle(peer);
}
}
else
if (relay_peer_expiration_state(peer) == RELAY_PEER_EXPIRED){
return false;
}
return true;
......@@ -1589,14 +1589,10 @@ relay_protocol_handle_router(relay_protocol_t *self) {
assert(self);
rmq_msg_t *msg = self->current_msg;
if(rmq_msg_recv (msg, self->router) == -1){
return -1; // Interrupted
}
//if(self->verbose)zsys_debug("%s", zuuid_str(self->uuid));
//rmq_msg_print(msg);
// First frame is sender identity
const byte *peerid_data = zframe_data (rmq_msg_routing_id (msg));
size_t peerid_size = zframe_size (rmq_msg_routing_id (msg));
......@@ -1608,7 +1604,6 @@ relay_protocol_handle_router(relay_protocol_t *self) {
}
zuuid_t *uuid = zuuid_new_from (peerid_data+1);
//zuuid_set (uuid, peerid_data + 1);
// On GREETING we may create the peer if it's unknown
// On other commands the peer must already exist
......@@ -1620,10 +1615,13 @@ relay_protocol_handle_router(relay_protocol_t *self) {
//Every Message counts as ping!
relay_peer_refresh(peer);
//zsys_error("Got %s", zuuid_str(rmq_msg_sender_id(msg)));
zuuid_t* sender = zuuid_dup(rmq_msg_sender_id(msg));
assert(sender);
//zsys_error("Got %s", zuuid_str(sender));
/* Message is handled by the internanal automaton of the connection.
* Currently this basically counts the messages and checks for losses*
*
......@@ -1633,7 +1631,7 @@ relay_protocol_handle_router(relay_protocol_t *self) {
* TODO: Can we even loose messages over TCP ? */
if (!relay_peer_receive(peer, msg)) {
zsys_warning ("(Relay %s) messages lost from %s", self->name != NULL ? self->name : zuuid_str(self->uuid), zuuid_str(relay_peer_uuid (peer)));
peer_table_remove_peer (self->peers, peer);//This will create pressure
//peer_table_remove_peer (self->peers, peer);//This will create pressure
}
if(self->verbose)
......@@ -1741,11 +1739,13 @@ relay_protocol_handle_timeout(relay_protocol_t *self) {
assert(self);
if(self->type == OUTGOING_RELAY)
if(self->type == OUTGOING_RELAY){
s_check_target(self);
}
/* Handle timeout for connected peers */
if(self->peers)peer_table_maintain(self->peers, s_handle_peer_timeout, self);
if(self->peers)
peer_table_maintain(self->peers, s_handle_peer_timeout, self);
}
static
......@@ -2361,6 +2361,7 @@ relay_protocol_actor (zsock_t *pipe, void *args)
//IDK
}
}
/*
if(self->type == OUTGOING_RELAY
&& self->frontend
&& s_get_connection(self->frontend)== RELAY_PEER_ACCEPTED
......@@ -2374,9 +2375,9 @@ relay_protocol_actor (zsock_t *pipe, void *args)
if (zpoller_expired(temp_poller)){
break;
}
}
}
*/
if(self->verbose)
zsys_debug ("( Relay %s) was terminated",
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment