Commit 2e8fc4c3 authored by thorsten's avatar thorsten
Browse files

* Part II of merge

parent cc693936
......@@ -119,7 +119,6 @@ set (relaymq_headers
src/relay_source.h
src/relay_context.h
src/relay_crypto.h
src/relay_protocol.h
src/relay_auth.h
src/relay_broker.h
)
......@@ -143,7 +142,6 @@ set (relaymq_sources
src/relay_source.c
src/relay_context.c
src/relay_crypto.c
src/relay_protocol.c
src/relay_auth.c
src/relay_event_handler.c
src/relay_broker.c
......
......@@ -17,13 +17,16 @@
public:
CliqueSubject(int size): size(size){
CliqueSubject(int size): size(size), ready(false), also_ready(0){
}
private:
int size;
bool ready;
int also_ready;
std::vector<RelayRef> mySubjects = {};
/**
......@@ -33,15 +36,24 @@
*/
void onTimeout(){
zsys_error("Connected to %i relays", mySubjects.size());
for(RelayRef destination : mySubjects){
destination->Send("ASK-FOR-LIN", mySubjects);
destination->Send("INTRODUCE", {this->shared_from_this()});
double r = ((double) rand() / (RAND_MAX));
if( r > 0.95){
this->Stop();
if(!ready){
for (RelayRef destination : mySubjects) {
destination->Send("ASK-FOR-LIN", mySubjects);
destination->Send("INTRODUCE", {this->shared_from_this()});
}
}
if(mySubjects.size() == size+1 && !ready){
ready = true;
for (RelayRef destination : mySubjects) {
destination->Send("READY", {});
}
}
if(ready)
zsys_error("I'm ready and %i people are also ready", also_ready);
if(also_ready == size+1)
Stop();
}
/**
......@@ -82,8 +94,9 @@
newRelay->Close();
}
}
} else{
assert(false);
} else
if(msg == "READY"){
also_ready++;
}
}
......@@ -100,7 +113,7 @@
/*Boilerplate-Code, der im Hintergrund alles aufsetzt*/
ApplicationContext::Init(10000);
int cliqueSize = 10;
int cliqueSize = 5;
std::vector<RelayRef> members = {};
auto last_created = ApplicationContext::Create<CliqueSubject>(cliqueSize);
......
......@@ -36,6 +36,7 @@
theOtherGuy->Close();
this->Stop();
}
}
/**
......@@ -67,9 +68,6 @@
theOtherGuy->Send("PING", {});
curr++;
}
if(msg == "PONG"){
puts(msg.c_str());
}
}
void onClose(RelayRef r){
......
......@@ -325,6 +325,10 @@ RELAYMQ_EXPORT int
RELAYMQ_EXPORT int
relay_set_verbose (relay_t *self);
// Deny messages from given relay.
RELAYMQ_EXPORT const char*
relay_endpoint (relay_t *self);
// Self test of this class.
RELAYMQ_EXPORT void
relay_test (bool verbose);
......@@ -354,8 +358,7 @@ relay_context_init();
void
relay_context_terminate();
const char*
relay_get_local_endpoint (relay_t *self);
#ifdef __cplusplus
}
......
......@@ -32,7 +32,6 @@ src_librelaymq_la_SOURCES = \
src/relay_source.c \
src/relay_context.c \
src/relay_crypto.c \
src/relay_protocol.c \
src/relay_auth.c \
src/relay_broker.c \
src/relay_proto.c \
......
......@@ -3,7 +3,7 @@
//
#include "relaymq_classes.h";
#include "relaymq_classes.h"
......@@ -48,7 +48,7 @@ relay_broker_create_incoming(relay_broker_t* self,
const char* endpoint //Endpoint to listen to
){
assert(self);
int rc = zsock_send(self->broker, "sUps", "ADD INCOMING", zuuid, pipe, endpoint);
int rc = zsock_send(self->broker, "ssUp", "ADD INCOMING", endpoint, zuuid, pipe);
assert(rc == 0);
rc += zsock_recv(self->broker, "s", &self->ep);
......@@ -56,14 +56,38 @@ relay_broker_create_incoming(relay_broker_t* self,
return rc;
}
int
relay_broker_remove_incoming(relay_broker_t* self,
zuuid_t* zuuid, // The relay's uuid
zsock_t* pipe, //pipe for messages
const char* endpoint //Endpoint to listen to
){
assert(self);
int rc = zsock_send(self->broker, "sUps", "REMOVE INCOMING", zuuid, pipe, endpoint);
int rc = zsock_send(self->broker, "ss", "REMOVE INCOMING", endpoint);
assert(rc == 0);
return rc;
}
int
relay_broker_remove_outgoing(relay_broker_t* self,
const char* endpoint //Endpoint to listen to
){
assert(self);
int rc = zsock_send(self->broker, "ss", "REMOVE OUTGOING", endpoint);
assert(rc == 0);
return rc;
}
int
relay_broker_remove_public(relay_broker_t* self,
const char* endpoint //Endpoint to listen to
){
assert(self);
int rc = zsock_send(self->broker, "ss", "REMOVE PUBLIC", endpoint);
assert(rc == 0);
return rc;
......@@ -79,8 +103,9 @@ relay_broker_create_public(relay_broker_t* self,
const char* public_key
){
assert(self);
int rc = zsock_send(self->broker, "sUpsss", "ADD PUBLIC",
zuuid, pipe, endpoint, public_endpoint, public_key);
int rc = zsock_send(self->broker, "ssUpss", "ADD PUBLIC",
endpoint,
zuuid, pipe, public_endpoint, public_key);
assert(rc == 0);
rc += zsock_recv(self->broker, "ss", &self->ep, &self->pub_ep);
......@@ -97,8 +122,9 @@ relay_broker_create_outgoing(relay_broker_t* self,
const char* endpoint
){
assert(self);
int rc = zsock_send(self->broker, "sUps", "ADD OUTGOING",
zuuid, target, endpoint);
int rc = zsock_send(self->broker, "ssUp", "ADD OUTGOING",
endpoint,
zuuid, target);
assert(rc == 0);
rc += zsock_recv(self->broker, "s", &self->ep);
......@@ -126,11 +152,12 @@ relay_broker_public_endpoint(relay_broker_t* self){
#define RMQ_IDENTITY_LEN 32
typedef enum{
typedef
enum{
RMQ_ACK = 0,
RMQ_PING = 1,
RMQ_MSG = 2,
};
RMQ_MSG = 2
}type_t;
typedef struct{
zuuid_t *uuid;
......@@ -404,6 +431,9 @@ s_broker_new(zsock_t* pipe){
self->local_router = zsock_new(ZMQ_ROUTER);
assert(self->local_router);
self->gossip = zsock_new(ZMQ_REP);
assert(self->gossip);
self->events = zsock_new(ZMQ_PUB);
assert(self->events);
......@@ -413,7 +443,7 @@ s_broker_new(zsock_t* pipe){
self->pipe = pipe;
assert(self->pipe);
self->poller = zpoller_new(self->pipe, self->router, self->local_router, NULL);
self->poller = zpoller_new(self->router, self->local_router, self->pipe,NULL);
assert(self->poller);
self->relays = zhashx_new();
......@@ -425,6 +455,8 @@ s_broker_new(zsock_t* pipe){
self->header =s_header_new();
assert(self->header);
self->timeo = 5000;
return self;
}
......@@ -516,7 +548,7 @@ s_handle_gossip(broker_t *self) {
assert(msg);
assert(zmsg_pop(msg) == NULL);//valid request only contains "GET"
beacon_t* beacon= zhashx_lookup(self->beacons, address);
beacon_t* beacon= (beacon_t*)zhashx_lookup(self->beacons, address);
assert(beacon);
zmsg_pushstr(msg, beacon->endpoint);//Endpoint of protected socket
......@@ -542,18 +574,11 @@ static int
s_retire_relay(broker_t* self, relay_sock_t* relay){
assert(self);
assert(relay);
//Tell application
int rc = zsock_send(self->events, "si", relay->relay_endpoint, RELAY_EVENT_EXPIRED);
assert(rc == 0);
//This creates backpressure
rc = zsock_unbind(self->router, "%s", relay->relay_endpoint);
assert(rc == 0);
rc = zsock_unbind(self->local_router, relay->local_endpoint);
assert(rc == 0);
if(relay->state == RELAY_PEER_NORMAL){
//Tell application
int rc = zsock_send(self->events, "si", relay->relay_endpoint, RELAY_EVENT_EXPIRED);
assert(rc == 0);
}
relay->state = RELAY_PEER_EXPIRED;
return 0;
......@@ -586,8 +611,12 @@ s_handle_router(broker_t* self, zsock_t* router){
}
assert(key);
relay_sock_t* relay = zhashx_lookup(self->relays, key);
assert(relay);
relay_sock_t* relay = (relay_sock_t*)zhashx_lookup(self->relays, key);
if(!relay){
zsys_error("Connection to Relay at was already destroyed");
zsock_flush(router);
return -1;
}
int rc;
if(relay->outgoing){
......@@ -624,13 +653,17 @@ s_handle_timeout(broker_t* self){
relay_sock_t* curr = (relay_sock_t*)zhashx_first(self->relays);
while(curr){
int64_t now = zclock_mono();
if(curr->evasive_at > now && curr->state != RELAY_PEER_EXPIRED){
if(curr->evasive_at >= now && curr->state == RELAY_PEER_NORMAL){
s_header_set(self->header, curr);
self->header->id[0] = (byte)RMQ_PING;
int rc = s_header_send(self->header, curr->dealer, 0);
if(rc != 0){
rc = s_retire_relay(self, curr);
assert(rc == 0);
}else{
curr->evasive_at = now+self->timeo;
}
}else{
......@@ -687,8 +720,6 @@ relay_connect(relay_sock_t* self, relay_identity_t* identity) {
int rc = zsock_connect(self->dealer, "%s", self->endpoint);
assert(rc == 0);
self->outgoing = true;
return 0;
}
......@@ -714,7 +745,7 @@ s_add_outgoing(
const char* ep = relay_identity_endpoint(identity);
assert(ep);
int rc = zsock_bind(self->router, endpoint);
int rc = zsock_bind(self->router, "%s", endpoint);
assert(rc > 0);
endpoint = zsock_endpoint(self->router);
......@@ -799,7 +830,7 @@ s_remove_incoming(
int rc = zsock_unbind(self->router, "%s",relay->relay_endpoint);
assert(rc == 0);
rc = zsock_unbind(self->local_router, relay->local_endpoint);
rc = zsock_unbind(self->local_router, "%s", relay->local_endpoint);
assert(rc == 0);
zhashx_delete(self->relays, endpoint);
......@@ -825,10 +856,16 @@ s_remove_outgoing(
}
assert(rc == 0);
//This creates backpressure
rc = zsock_unbind(self->router, "%s", relay->relay_endpoint);
assert(rc == 0);
rc = zsock_unbind(self->local_router, "%s", relay->local_endpoint);
assert(rc == 0);
zhashx_delete(self->relays, endpoint);
relay_sock_destroy(&relay);
return 0;
}
......@@ -871,7 +908,7 @@ s_remove_public(
)
{
beacon_t* beacon = zhashx_lookup(self->beacons, public_endpoint);
beacon_t* beacon = (beacon_t*)zhashx_lookup(self->beacons, public_endpoint);
assert(beacon);
s_beacon_destroy(&beacon);
......@@ -903,11 +940,11 @@ s_handle_api(broker_t *self) {
char* public_endpoint = NULL;
char* public_key = NULL;
int rc = zsock_recv(self->pipe, "sUpsss",
int rc = zsock_recv(self->pipe, "ssUpss",
&command,
&endpoint,
&uuid,
&pipe_or_ident,
&endpoint,
&public_endpoint,
&public_key);
assert(rc == 0);
......@@ -932,7 +969,7 @@ s_handle_api(broker_t *self) {
uuid,
(relay_identity_t*) pipe_or_ident,
endpoint);
return 0;
return ok;
} else
if(streq(command, "REMOVE OUTGOING")){
int ok = s_remove_outgoing(self, endpoint);
......@@ -941,7 +978,7 @@ s_handle_api(broker_t *self) {
if(streq(command, "ADD PUBLIC")){
int ok = s_add_public(self,
uuid,
(zsock_t*) pipe,
(zsock_t*) pipe_or_ident,
endpoint,
public_endpoint,
public_key);
......@@ -969,7 +1006,7 @@ relay_broker(zsock_t* pipe, void* args ){
zsock_signal(pipe, 0);
while(!self->terminated){
zsock_t* which = (zsock_t*)zpoller_wait(self->poller, -1);
zsock_t* which = (zsock_t*)zpoller_wait(self->poller, (int) self->timeo);
if(which == self->router){
s_handle_router(self, self->router);
}else
......@@ -982,7 +1019,10 @@ relay_broker(zsock_t* pipe, void* args ){
if(which == self->pipe){
s_handle_api(self);
}else
{
if(zpoller_expired(self->poller)){
s_handle_timeout(self);
}
else {
self->terminated = true;
}
}
......@@ -1002,7 +1042,6 @@ relay_broker_test (bool verbose){
relay_broker_create_incoming(broker, uuid, back, "tcp://127.0.0.1:*");
const char *id = relay_broker_endpoint(broker);
relay_identity_t* local = relay_identity_new(id);
relay_identity_set_uuid(local, uuid);
......@@ -1027,7 +1066,7 @@ relay_broker_test (bool verbose){
char* msg = zstr_recv(front);
assert(streq(msg, "troll"));
relay_broker_remove_incoming(broker, uuid, back, id);
relay_broker_remove_incoming(broker, id);
zclock_sleep(1000);
......@@ -1037,7 +1076,7 @@ relay_broker_test (bool verbose){
zsock_t* sub = zsock_new(ZMQ_SUB);
assert(sub);
zsock_set_subscribe(sub, id);
zsock_set_subscribe(sub, id2);
rc = zsock_connect(sub, "%s", "inproc://RELAY-EVENTS");
assert(rc == 0);
......
......@@ -22,10 +22,14 @@ relay_broker_create_incoming(relay_broker_t* self,
const char* endpoint //Endpoint to listen to
);
int
relay_broker_delete_incoming(relay_broker_t* self,
const char* endpoint //Endpoint to listen to
);
int
relay_broker_remove_incoming(relay_broker_t* self,
zuuid_t* zuuid, // The relay's uuid
zsock_t* pipe, //pipe for messages
const char* endpoint //Endpoint to listen to
);
......@@ -38,6 +42,12 @@ relay_broker_create_public(relay_broker_t* self,
const char* public_key
);
int
relay_broker_remove_outgoing(relay_broker_t* self,
const char* endpoint //Endpoint to listen to
);
int
relay_broker_create_outgoing(relay_broker_t* self,
zuuid_t* zuuid,
......@@ -45,6 +55,12 @@ relay_broker_create_outgoing(relay_broker_t* self,
const char* endpoint
);
int
relay_broker_remove_public(relay_broker_t* self,
const char* endpoint //Endpoint to listen to
);
const char*
relay_broker_endpoint(relay_broker_t* self);
......
......@@ -290,9 +290,15 @@ relay_crypto_encrypt(relay_crypto_t* self,byte* key, zmsg_t** msg){
* Make single frame out of message to reduce overhead.
* Otherwise we would need to iterate ver each frame and everytime and had a lot more memory allocations
*/
#if (CZMQ_VERSION == 30002)
byte *buffer;
size_t size = zmsg_encode(*msg, &buffer);
clear = zframe_new(buffer, size);
#elif (CZMQ_VERSION > 30002)
clear = zmsg_encode(*msg);
#else
assert(false);//Incomapatible version, you need to ugrade
#endif
}
break;
}
......@@ -334,7 +340,16 @@ relay_crypto_decrypt(relay_crypto_t* self, byte* key, zmsg_t** msg){
case CLEAR:
cleartext = s_process_message (self, key, current);
assert(cleartext);
#if (CZMQ_VERSION == 30002)
real_msg = zmsg_decode(zframe_data(cleartext), zframe_size(cleartext));
#elif (CZMQ_VERSION > 30002)
real_msg = zmsg_decode(cleartext);
#else
assert(false);//Incomapatible version, you need to ugrade
#endif
break;
case ENCRYPTED://Frame is encrypted
......
......@@ -8,7 +8,9 @@ struct _relay_event_handler_t{
zsock_t* sock;
zpoller_t* poller;
zhashx_t* relays;
relay_event_t* event;
relay_t* relay;
int event;
};
relay_event_handler_t*
......@@ -19,22 +21,20 @@ relay_event_handler_new(relay_event_type_t event, ...){
self->sock = zsock_new(ZMQ_SUB);
assert(self->sock);
int rc = zsock_connect(self->sock, "%s", "inproc://RELAY-EVENTS");
assert(rc == 0);
self->relays = zhashx_new();
assert(self->relays);
self->poller = zpoller_new(self->sock, NULL);
assert(self->poller);
self->event = relay_event_new();
assert(self->event);
va_list args;
va_start(args, event);
int current = event;
while (current != RELAY_EVENT_NONE){
char* event_c = zsys_sprintf("%i", current);
zsock_set_subscribe(self->sock, event_c);
current = va_arg(args, int);
zstr_free(&event_c);
}
va_end(args);
return self;
......@@ -57,14 +57,32 @@ relay_event_handler_add(relay_event_handler_t* self,
relay_t* relay){
assert(self);
assert(relay);
return zsock_connect(self->sock, "%s-EVENTS", relay_get_local_endpoint(relay));
char* ep = (char *) relay_endpoint(relay);
assert(ep);
int rc = zhashx_insert(self->relays, ep, relay);
assert(rc == 0);
zsock_set_subscribe(self->sock, ep);
return rc;
}
int
relay_event_handler_remove(relay_event_handler_t* self, relay_t* relay){
assert(self);
assert(relay);
return zsock_disconnect(self->sock, "%s-EVENTS", relay_get_local_endpoint(relay));
char* ep = (char *) relay_endpoint(relay);
assert(ep);
zhashx_delete(self->relays, ep);
zsock_set_unsubscribe(self->sock, ep);
return 0;
}
int
......@@ -72,7 +90,19 @@ relay_event_handler_wait(relay_event_handler_t* self, int timeo){
int rc = 0;
zsock_t* which = (zsock_t*)zpoller_wait(self->poller, timeo);
if(which){
rc = relay_event_recv(self->event, which);
char* endpoint;
int event;
rc = zsock_recv(self->sock,"si", &endpoint, &event);
assert(rc == 0);
relay_t* relay = (relay_t*)zhashx_lookup(self->relays, endpoint);
assert(relay);
relay_event_handler_remove(self, relay);
self->event = event;
self->relay = relay;
} else
if(zpoller_terminated(self->poller)){
rc = -1;
......@@ -98,13 +128,13 @@ relay_event_handler_has_next(relay_event_handler_t* self){
relay_event_type_t
relay_event_handler_type(relay_event_handler_t* self){
assert(self);
return relay_event_type(self->event);
return (relay_event_type_t) self->event;
}
relay_t*
relay_event_handler_source(relay_event_handler_t* self){
assert(self);
return relay_event_relay(self->event);
return self->relay;
}