Commit 528769f2 authored by thorsten's avatar thorsten
Browse files

* Added relay_event_handler

parent c6a106ad
//
// Created by thorsten on 28.11.16.
//
#ifndef RELAYMQ_RELAY_EVENT_HANDLER_H
#define RELAYMQ_RELAY_EVENT_HANDLER_H
#ifdef __cplusplus
extern "C" {
#endif
#include "relay.h"
typedef struct _relay_event_handler_t relay_event_handler_t;
relay_event_handler_t*
relay_event_handler_new(relay_event_type_t event, ...);
void
relay_event_handler_destroy(relay_event_handler_t** self_p);
int
relay_event_handler_add(relay_event_handler_t* self,
relay_t* relay);
int
relay_event_handler_remove(relay_event_handler_t* self, relay_t* relay);
int
relay_event_handler_wait(relay_event_handler_t* self, int timeo);
int
relay_event_handler_next(relay_event_handler_t* self);
int
relay_event_handler_has_next(relay_event_handler_t* self);
relay_event_type_t
relay_event_handler_type(relay_event_handler_t* self);
relay_t*
relay_event_handler_source(relay_event_handler_t* self);
zsock_t*
relay_event_handler_sock(relay_event_handler_t* self);
#ifdef __cplusplus
}
#endif
#endif //RELAYMQ_RELAY_EVENT_HANDLER_H
......@@ -54,6 +54,7 @@ typedef struct _relay_identity_t relay_identity_t;
#include "relay_prelude.h"
#include "relay.h"
#include "relay_identity.h"
#include "relay_event_handler.h"
#endif
/*
......
......@@ -17,6 +17,7 @@ include_HEADERS = \
include/relay_prelude.h \
include/relay.h \
include/relay_identity.h \
include/relay_event_handler.h \
include/relaymq_library.h
src_librelaymq_la_SOURCES = \
......@@ -33,6 +34,7 @@ src_librelaymq_la_SOURCES = \
src/relay_crypto.c \
src/relay_protocol.c \
src/relay_auth.c \
src/relay_event_handler.c \
src/platform.h
src_librelaymq_la_CPPFLAGS = ${AM_CPPFLAGS}
......
//
// Created by thorsten on 28.11.16.
//
#include "relaymq_classes.h"
struct _relay_event_handler_t{
zsock_t* sock;
zpoller_t* poller;
zhashx_t* relays;
relay_event_t* event;
};
relay_event_handler_t*
relay_event_handler_new(relay_event_type_t event, ...){
relay_event_handler_t* self = (relay_event_handler_t*) zmalloc(sizeof(relay_event_handler_t));
assert(self);
self->sock = zsock_new(ZMQ_SUB);
assert(self->sock);
self->poller = zpoller_new(self->sock, NULL);
assert(self->poller);
self->event = relay_event_new();
assert(self->event);
va_list args;
va_start(args, event);
int current = event;
while (current != RELAY_EVENT_NONE){
char* event_c = zsys_sprintf("%i", current);
zsock_set_subscribe(self->sock, event_c);
current = va_arg(args, int);
zstr_free(&event_c);
}
va_end(args);
return self;
}
void
relay_event_handler_destroy(relay_event_handler_t** self_p){
if(*self_p){
relay_event_handler_t* self = *self_p;
zsock_destroy(&self->sock);
zpoller_destroy(&self->poller);
zhashx_destroy(&self->relays);
free(*self_p);
*self_p = NULL;
}
}
int
relay_event_handler_add(relay_event_handler_t* self,
relay_t* relay){
assert(self);
assert(relay);
return zsock_connect(self->sock, "%s-EVENTS", relay_get_local_endpoint(relay));
}
int
relay_event_handler_remove(relay_event_handler_t* self, relay_t* relay){
assert(self);
assert(relay);
return zsock_disconnect(self->sock, "%s-EVENTS", relay_get_local_endpoint(relay));
}
int
relay_event_handler_wait(relay_event_handler_t* self, int timeo){
int rc = 0;
zsock_t* which = (zsock_t*)zpoller_wait(self->poller, timeo);
if(which){
rc = relay_event_recv(self->event, which);
} else
if(zpoller_terminated(self->poller)){
rc = -1;
} else
if(zpoller_expired(self->poller)){
rc = -1;
}
return rc;
}
int
relay_event_handler_next(relay_event_handler_t* self){
return relay_event_handler_wait(self, 0);
}
int
relay_event_handler_has_next(relay_event_handler_t* self){
assert(self);
void* which = zpoller_wait(self->poller, 0);
return which ? 0 : 1;
}
relay_event_type_t
relay_event_handler_type(relay_event_handler_t* self){
assert(self);
return relay_event_type(self->event);
}
relay_t*
relay_event_handler_source(relay_event_handler_t* self){
assert(self);
return relay_event_relay(self->event);
}
zsock_t*
relay_event_handler_sock(relay_event_handler_t* self){
assert(self);
return self->sock;
}
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment