Commit c653827e authored by Alexander Setzer's avatar Alexander Setzer
Browse files

Merge branch 'master' of git.cs.upb.de:crc-901/RelayMQ into intermediate

Conflicts:
	bindings/cpp/tests/messages/addressbook.pb.cc
parents f8cc6dd7 7934d7e9
<?xml version="1.0" encoding="UTF-8"?>
<module type="CPP_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src/platform.h.in" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/relaymq_selftest.c" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/relay_auth.h" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/rmq_msg.c" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/relay_crypto.h" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/relay_source.c" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/relay_event.c" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/application_template.c" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/librelaymq.pc.in" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/relay_source.h" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/relay_protocol.c" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/relay_context.c" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/peer_table.h" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/relay_crypto.c" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/relay_event.h" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/relay_identity.c" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/relay_target.h" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/rmq_msg.h" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/relay.c" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/relay_peer.h" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/relay_target.c" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/relay_context.h" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/relay_peer.c" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/relay_handle.h" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/relay_auth.c" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/peer_table.c" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/relay_handle.c" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/relay_protocol.h" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/Findlibzmq.cmake" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/CMakeLists.txt" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/Findczmq.cmake" isTestSource="false" />
</content>
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module-library">
<library name="Header Search Paths">
<CLASSES>
<root url="file:///usr/lib/gcc/x86_64-linux-gnu/5/include" />
<root url="file:///usr/lib/gcc/x86_64-linux-gnu/5/include-fixed" />
<root url="file:///usr/include" />
<root url="file:///usr/local/include" />
</CLASSES>
<SOURCES>
<root url="file:///usr/lib/gcc/x86_64-linux-gnu/5/include" />
<root url="file:///usr/lib/gcc/x86_64-linux-gnu/5/include-fixed" />
<root url="file:///usr/include" />
<root url="file:///usr/local/include" />
</SOURCES>
</library>
</orderEntry>
</component>
</module>
\ No newline at end of file
......@@ -200,6 +200,18 @@ To receive a message use the method 'relay_recv':
For advanced behavior, we want to know when certain events (like a new connection) happen, but how do we get these events?
Using ZeroMQ PUB/SUB System, we can simply subscribe for them !
// Creates a socket that receives a message each time a new relay connects to "my_server"
zsock_t* sub = relay_subscribe(my_server, RELAY_EVENT_ACCEPT, NULL)
How do these events look like ? TBD
### Connecting to a Relay I
Now we know how to wait for messages, but how do we send ?
......
git stash
git pull
./autogen.sh
./configure
make
sudo make install
sudo ldconfg
cd ..
cd bindings
cd cpp
cmake ./
sudo make install
cd ..
cd ..
......@@ -94,12 +94,12 @@ file(GLOB TEST_SRCS RELATIVE ${CMAKE_CURRENT_SOURCE_DIR}
tests/RelayStressTests.cpp
tests/LoopTests.cpp
tests/ApplicationTests.cpp
tests/FiniteMessageTests.cpp
tests/IntroductionTest.cpp
tests/ProtobufferTest.cpp)
tests/IncomingRelayTests.cpp
)
find_package(Protobuf REQUIRED)
find_package(Protobuf)
FILE(GLOB BAR_PROTOS "tests/messages/*.proto")
......@@ -150,7 +150,15 @@ endforeach(testSrc)
#Install headers and the library
install(FILES
include/*.h
include/Relay.h
include/Message.h
include/Socket.h
include/RmqLoop.h
include/RmqPoller.h
include/Subject.h
include/ApplicationContext.h
include/Actor.h
include/LightweightRelay.h
DESTINATION include)
install(TARGETS relaymqcpp
LIBRARY DESTINATION "lib${LIB_SUFFIX}" # .so file
......
......@@ -21,11 +21,15 @@ private:
std::unordered_map<std::shared_ptr<IncomingRelay>, std::unique_ptr<CancelHandle>> cancels;
std::atomic_int subject_count;
std::atomic_bool started;
size_t timeout;
// verhindert, dass ein Objekt von außerhalb von N erzeugt wird.
ApplicationContext():
ApplicationContext(size_t timeout=5000) :
subject_count(0),
started(false)
started(false),
timeout(timeout)
{
/*...*/
}
......@@ -41,15 +45,10 @@ private:
void init();
public:
static ApplicationContext& get()
{
static ApplicationContext _instance;
return _instance;
}
static ApplicationContext& get();
/**
* Initializes the Application in the backgroung.
......@@ -57,10 +56,7 @@ public:
* Call this at the beginning of your main method.
*
*/
static void Init()
{
ApplicationContext::get();
}
static void Init(size_t timeout = 5000);
/**
* Starts the Application and blocks.
......@@ -112,6 +108,20 @@ public:
{
std::shared_ptr<T> ref = std::make_shared<T>(std::forward<Args...>(args...));
ApplicationContext::get().addSubject(ref);
//RelayRef out = OutgoingRelay::createOutgoingRelay(ref->getEndpoint());
return ref;
}
template<class T>
static RelayRef Create()
{
std::shared_ptr<T> ref = std::make_shared<T>();
ApplicationContext::get().addSubject(ref);
//RelayRef out = OutgoingRelay::createOutgoingRelay(ref->getEndpoint());
return ref;
}
......@@ -124,7 +134,7 @@ public:
*
* ref->Send("I'M BUSY")
* ApplicationContext::ExecuteLater([ref]()->void{
* /* This code is executed 1s laster
* //This code is executed 1s laster
* ref->Send("DONE")
* }
* );
......@@ -145,7 +155,7 @@ public:
void addRelay(std::shared_ptr<Subject> subject, std::shared_ptr<IncomingRelay> relay);
void cancleRelay(std::shared_ptr<IncomingRelay> relay);
void cancelRelay(std::shared_ptr<IncomingRelay> relay);
std::shared_ptr<TicketHandle> executeLater(std::function<void()> fun);
......
......@@ -23,6 +23,7 @@ class TimeoutException;
class RelayContext{
public:
RelayContext(){
zsys_set_linger(5000);
zsys_set_max_sockets(20000);
relay_context_init();
};
......@@ -46,6 +47,7 @@ class Relay: public Socket {
friend class LocalRelay;
private:
zsock_t* msgpipe;
protected:
Relay();
......@@ -77,15 +79,23 @@ class Relay: public Socket {
bool Send(std::string msg, std::vector<RelayRef> references);
bool Send(zmsg_t** msg, std::vector<RelayRef> references);
/**
* Closes the Relay. No more messages will be received or forwarded.
*/
void Close();
std::shared_ptr<Socket> listenFor(relay_event_type_t event_type);
/**
* Compares the (remote) target of this relay with the other's.
*
* @param other
* @return: True, if Relays have the same target
*/
bool Compare(const Relay& other);
bool Compare(const RelayRef other);
};
/**
* An Incoming Relay, which can receive message from other Relays.
*
......@@ -325,7 +335,7 @@ public:
*/
class PublicRelay: public IncomingRelay{
private:
protected:
PublicRelay(const std::string& endpoint);
public:
......
......@@ -3,35 +3,56 @@
#include "Relay.h"
#include "RmqLoop.h"
#include <czmq.h>
class ApplicationContext;
class Subject :
public IncomingRelay,
public PublicRelay,
public std::enable_shared_from_this<Subject>{
friend class ApplicationContext;
friend class ApplicationContext;
private:
zpoller_t* poller;
bool running;
std::shared_ptr<CancelHandle> handler;
protected:
Subject(): IncomingRelay("tcp://127.0.0.1:*"){
Subject(): PublicRelay("tcp://127.0.0.1:*"){
poller = zpoller_new(NULL);
running = true;
}
void add(std::shared_ptr<IncomingRelay> r);
std::shared_ptr<TicketHandle> executeLater(std::function<void()>&& fun);
virtual bool onTimeout() = 0;
virtual bool onMessage(RelayRef receiver, std::string msg, std::vector<RelayRef> refs) = 0;
virtual void onTimeout() = 0;
virtual void onMessage(RelayRef receiver, std::string msg, std::vector<RelayRef> refs) = 0;
virtual void onClose(RelayRef receiver){};
bool onMessageInternal(RmqLoop* loop, std::shared_ptr<Socket> socket);
bool onTimeoutInternal();
public:
void start();
/**
* Stops this Subject. No more messages will be received
*
* Note: Make sure all its Relays are closed
*/
void Stop();
/**
* Creates new incoming Relay.
*
* All messages for this Relay will be directed to this subject's 'onMessage'
*
* @return: Reference to new Relay
*/
RelayRef CreateNewIncoming();
virtual ~Subject();
......
#include <thread>
#include "../include/ApplicationContext.h"
......@@ -12,7 +13,7 @@ void ApplicationContext::addSubject(std::shared_ptr<Subject> subject) {
void ApplicationContext::removeSubject(std::shared_ptr<Subject> subject) {
for(std::shared_ptr<IncomingRelay> r : relays[subject]){
if(r.get() != nullptr){
cancleRelay(r);
cancelRelay(r);
}
}
subjects.erase(subject);
......@@ -27,15 +28,29 @@ void ApplicationContext::addRelay(std::shared_ptr<Subject> subject, std::shared_
relay->getRaw();
cancels[relay] = this->loop.add(relay,
[subject](RmqLoop* loop, std::shared_ptr<Socket> sock)->bool{
return subject->onMessageInternal(loop,sock);
});
if(subject->running)
return subject->onMessageInternal(loop,sock);
else
return true;
},
[](RmqLoop* loop, std::shared_ptr<Socket> sock)->bool{
return true;
},
[](RmqLoop* loop, std::shared_ptr<Socket> sock)->bool{
auto ref = std::static_pointer_cast<IncomingRelay>(sock);
ref->Close();
return true;
},
false
);
relays[subject].push_back(relay);
}
void ApplicationContext::cancleRelay(std::shared_ptr<IncomingRelay> relay) {
void ApplicationContext::cancelRelay(std::shared_ptr<IncomingRelay> relay) {
if(cancels[relay]){
cancels[relay]->Cancel();
cancels.erase(relay);
relay->Close();
}
}
......@@ -44,14 +59,15 @@ void ApplicationContext::start() {
{
std::vector<std::shared_ptr<Subject>> vec = {};
for(std::shared_ptr<Subject> s: subjects){
bool ok = s->onTimeout();
bool ok = s->onTimeoutInternal();
if(!ok) vec.push_back(s);
}
for(std::shared_ptr<Subject> s: vec){
this->removeSubject(s);
}
return true;
}
return !subjects.empty();
},
timeout
);
this->started = true;
this->loop.Start();
......@@ -61,7 +77,7 @@ void ApplicationContext::shutdown() {
this->loop.Stop();
}
std::shared_ptr<TicketHandle> ApplicationContext::executeLater(std::function<void()> fun) {
std::shared_ptr<TicketHandle> ApplicationContext::executeLater(std::function<void()> fun) {
return this->loop.addTicket(fun);
}
......@@ -69,3 +85,14 @@ void ApplicationContext::init() {
this->loop = RmqLoop();
this->started = false;
}
static ApplicationContext* _instance;
void ApplicationContext::Init(size_t timeout) {
_instance = new ApplicationContext(timeout);
}
ApplicationContext &ApplicationContext::get() {
return *_instance;
}
......@@ -27,6 +27,7 @@ std::string PublicRelay::getEndpoint() const {
Relay::~Relay() {
if(!closed)
Close();
relay_destroy(&this->relay);
}
Relay::Relay() {
......@@ -45,10 +46,11 @@ void Relay::setVerbose() {
}
void Relay::Close() {
assert(this->relay);
relay_destroy(&this->relay);
closed = true;
if(!closed){
assert(this->relay);
relay_close(this->relay);
closed = true;
}
}
std::shared_ptr<Socket> Relay::listenFor(relay_event_type_t event) {
......@@ -112,6 +114,10 @@ bool Relay::Send(zmsg_t** msg, std::vector<std::shared_ptr<Relay>> references){
}
bool Relay::Compare(const RelayRef other) {
return this->Compare(*other);
}
std::shared_ptr<LocalRelay> LocalRelay::createLocalRelay(std::weak_ptr<Relay> other) {
return std::shared_ptr<LocalRelay>(new LocalRelay(other));
......
......@@ -704,7 +704,9 @@ void RmqLoop::ResetTicket(void *c_handle) {
std::unique_ptr<CancelHandle>
RmqLoop::add(std::shared_ptr<Socket> socket, SimpleHandlerFct handler, SimpleStartFct onStart, SimpleCancelFct onCancel,
RmqLoop::add(std::shared_ptr<Socket> socket, SimpleHandlerFct handler,
SimpleStartFct onStart,
SimpleCancelFct onCancel,
bool cancel_loop_on_exit) {
zmutex_lock(this->mutex);
......
......@@ -6,11 +6,21 @@ void Subject::add(std::shared_ptr<IncomingRelay> r) {
}
bool Subject::onMessageInternal(RmqLoop *loop, std::shared_ptr<Socket> socket) {
std::shared_ptr<IncomingRelay> r = std::static_pointer_cast<IncomingRelay>(socket);
auto msg = r->ReceiveString();
this->onMessage(r,
msg.first,
msg.second);
zpoller_add(this->poller, socket->getRaw());
void* which = zpoller_wait(this->poller, 0);
while(which != NULL && which == socket->getRaw() && running){
std::shared_ptr<IncomingRelay> r = std::static_pointer_cast<IncomingRelay>(socket);
auto msg = r->ReceiveString();
try {
this->onMessage(r, msg.first, msg.second);
}catch (std::exception e){
puts(e.what());
running = false;
break;
}
which = zpoller_wait(this->poller, 0);
}
zpoller_remove(this->poller, socket->getRaw());
return true;
}
......@@ -18,14 +28,32 @@ bool Subject::onMessageInternal(RmqLoop *loop, std::shared_ptr<Socket> socket) {
Subject::~Subject() {
//ApplicationContext::get().removeSubject(this->shared_from_this());
//this->Close();
}
std::shared_ptr<TicketHandle> Subject::executeLater(std::function<void()>&& fun) {
return ApplicationContext::get().executeLater(std::forward<std::function<void()>>(fun));
}
void Subject::start() {
ApplicationContext::get().addSubject(this->shared_from_this());
bool Subject::onTimeoutInternal() {
try {
onTimeout();
}catch (std::exception e){
puts(e.what());
running = false;
}
return running;
}
void Subject::Stop() {
running = false;
this->Close();
}
RelayRef Subject::CreateNewIncoming() {
auto ref = PublicRelay::createPublicRelay("tcp://127.0.0.1:*");
ApplicationContext::get().addRelay(this->shared_from_this(), ref);
return ref;
}
......
......@@ -2,8 +2,7 @@
#define BOOST_TEST_MODULE ApplicationTests
#include <boost/test/unit_test.hpp>
#include <relaymq.h>
#include "../tests/messages/addressbook.pb.h"
#include <thread>
#include "../include/Relay.h"
#include "../include/Socket.h"
......@@ -23,15 +22,20 @@
int limit;
int curr;
std::shared_ptr<OutgoingRelay> theOtherGuy;
RelayRef theOtherGuy;
/**
* Wird 1x pro Sekunde aufgerufen
* Wird 1x pro Sekunde aufgerufen, wenn es keine Nachrichten gibt.
*
*
* @return true
* @return true, wenn alles OK ist
* false, wenn das Subject beendet werden soll (Selbstzerstörung)
*/
bool onTimeout(){
return (curr < limit);
void onTimeout(){
if (curr > limit){
theOtherGuy->Close();
this->Stop();
}
}
/**
......@@ -42,13 +46,13 @@
* @param refs: Ein Vector von Referenzen, die mitgeschickt wurden
* @return: True, falls das Relay weiterhin empfangen soll
*/
virtual bool onMessage(RelayRef receiver, std::string msg, std::vector<RelayRef> refs){
if(msg == "INIT"){
virtual void onMessage(RelayRef receiver, std::string msg, std::vector<RelayRef> refs){
if(msg == "INIT"){//Aus der MAIN
theOtherGuy = refs[0];
bool ok = theOtherGuy->Send("INTRODUCE", {receiver});
assert(ok);
}else
if(msg == "INTRODUCE"){
if(msg == "INTRODUCE"){//Vom anderen Subject
theOtherGuy = refs[0];
theOtherGuy->Send("PING",{});
} else
......@@ -63,7 +67,6 @@
theOtherGuy->Send("PING", {});
curr++;
}
return true;
}
};
......@@ -75,9 +78,10 @@
/*Boilerplate-Code, der im Hintergrund alles aufsetzt*/
ApplicationContext::Init();
/*Jedes Subject soll 1000 Nachrichten senden*/
int limit = 1000;
/* Erstellt das Subject */
/* Erstellt das Subject und setzt limit*/
auto ping = ApplicationContext::Create<PingPongSubject>(limit);
auto pong = ApplicationContext::Create<PingPongSubject>(limit);
......@@ -92,6 +96,8 @@
* 2) Das Zustellen von Nachrichten beginnt
*/
ApplicationContext::Start();
}
......
......@@ -7,7 +7,7 @@
#include "../include/Relay.h"
#include "../include/Socket.h"
#include "../include/ApplicationContext.h"
#include "../../../../../Work/Test/RelayMQ/bindings/cpp/include/ApplicationContext.h"