Commit aecdb06b authored by user's avatar user
Browse files

Merge branch 'master-test' of git.cs.upb.de:crc-901/RelayMQ

parents fd4259e0 a262442e
......@@ -142,7 +142,7 @@ set (relaymq_sources
src/relay_crypto.c
src/relay_protocol.c
src/relay_auth.c
)
src/relay_event_handler.c)
source_group("Source Files" FILES ${relaymq_sources})
if (NOT DEFINED BUILD_SHARED_LIBS)
SET(BUILD_SHARED_LIBS ON)
......
File mode changed from 100755 to 100644
<?xml version="1.0" encoding="UTF-8"?>
<module type="CPP_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/tests/ApplicationTests.cpp" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/tests/TimeoutTests.cpp" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/tests/RelayStressTests.cpp" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/tests/LoopTests.cpp" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/tests/IntroductionTest.cpp" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/tests/RelayTests.cpp" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/tests/IncomingRelayTests.cpp" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/include/Message.h" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/include/Subject.h" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/include/Relay.h" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/include/RelayEventHandler.h" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/include/ApplicationContext.h" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/include/RmqPoller.h" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/include/Socket.h" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/include/Actor.h" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/include/LightweightRelay.h" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/include/RmqLoop.h" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/Subject.cpp" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/RmqLoop.cpp" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/RmqPoller.cpp" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/LightweightRelay.cpp" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/Actor.cpp" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/ApplicationContext.cpp" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/Socket.cpp" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/Message.cpp" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/Relay.cpp" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/RelayEventHandler.cpp" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/CMakeLists.txt" isTestSource="false" />
</content>
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module-library">
<library name="Header Search Paths">
<CLASSES>
<root url="file:///usr/lib/gcc/x86_64-linux-gnu/5/include-fixed" />
<root url="file:///usr/lib/gcc/x86_64-linux-gnu/5/include" />
<root url="file:///usr/include" />
<root url="file:///usr/local/include" />
<root url="file://$MODULE_DIR$/../../include" />
</CLASSES>
<SOURCES>
<root url="file:///usr/lib/gcc/x86_64-linux-gnu/5/include-fixed" />
<root url="file:///usr/lib/gcc/x86_64-linux-gnu/5/include" />
<root url="file:///usr/include" />
<root url="file:///usr/local/include" />
<root url="file://$MODULE_DIR$/../../include" />
</SOURCES>
</library>
</orderEntry>
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CMakeWorkspace" PROJECT_DIR="$PROJECT_DIR$" />
<component name="ProjectLevelVcsManager" settingsEditedManually="false">
<OptionsSetting value="true" id="Add" />
<OptionsSetting value="true" id="Remove" />
<OptionsSetting value="true" id="Checkout" />
<OptionsSetting value="true" id="Update" />
<OptionsSetting value="true" id="Status" />
<OptionsSetting value="true" id="Edit" />
<ConfirmationsSetting value="0" id="Add" />
<ConfirmationsSetting value="0" id="Remove" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/cpp.iml" filepath="$PROJECT_DIR$/.idea/cpp.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/../.." vcs="Git" />
</component>
</project>
\ No newline at end of file
......@@ -43,7 +43,7 @@ include_directories(${RMQ_INCLUDE_DIR})
set(LIBS ${LIBS} ${RMQ_LIBRARY})
find_library (ZMQ_LIBRARY NAMES zmq libzmq REQUIRED)
include_directories (../../include/)
set (relaymqcpp_sources
src/Relay.cpp
......@@ -63,7 +63,10 @@ set (relaymqcpp_sources
src/Actor.cpp
include/Actor.h
src/LightweightRelay.cpp
include/LightweightRelay.h)
include/LightweightRelay.h
src/RelayEventHandler.cpp
include/RelayEventHandler.h
)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -pthread")
......@@ -96,6 +99,7 @@ file(GLOB TEST_SRCS RELATIVE ${CMAKE_CURRENT_SOURCE_DIR}
tests/ApplicationTests.cpp
tests/IntroductionTest.cpp
tests/IncomingRelayTests.cpp
tests/TimeoutTests.cpp
)
......
......@@ -6,6 +6,8 @@
#include "RmqLoop.h"
#include "Relay.h"
#include "Subject.h"
#include "RelayEventHandler.h"
......@@ -13,10 +15,19 @@ class ApplicationContext {
private:
struct SubjectEntry{
std::shared_ptr<Subject> subject;
std::vector<std::unique_ptr<TicketHandle>> tickets;
std::unordered_map<std::shared_ptr<IncomingRelay>, std::unique_ptr<CancelHandle>> cancels;
std::unique_ptr<CancelHandle> event_cancel;
};
RmqLoop loop;
std::shared_ptr<RelayEventHandler> handler;
RelayContext ctx;
std::unordered_set<std::shared_ptr<Subject>> subjects;
std::unordered_map<std::shared_ptr<Subject>, std::vector<std::shared_ptr<IncomingRelay>>> relays;
std::unordered_map<std::shared_ptr<Subject>, std::unique_ptr<CancelHandle>> subject_cancels;
std::unordered_map<std::shared_ptr<Subject>, std::unique_ptr<TicketHandle>> tickets;
std::unordered_map<std::shared_ptr<IncomingRelay>, std::unique_ptr<CancelHandle>> cancels;
std::atomic_int subject_count;
......@@ -31,7 +42,8 @@ private:
started(false),
timeout(timeout)
{
/*...*/
handler = std::make_shared<RelayEventHandler>(RELAY_EVENT_EXPIRED);
assert(handler);
}
ApplicationContext( const ApplicationContext& ); /* verhindert, dass eine weitere Instanz via Kopier-Konstruktor erstellt werden kann */
......@@ -146,7 +158,6 @@ public:
return ApplicationContext::get().executeLater(std::forward<std::function<void()>>(fun));
}
~ApplicationContext() {}
void addSubject(std::shared_ptr<Subject> subject);
......@@ -159,7 +170,6 @@ public:
std::shared_ptr<TicketHandle> executeLater(std::function<void()> fun);
};
......
......@@ -45,6 +45,7 @@ class Relay: public Socket {
friend class IncomingRelay;
friend class OutgoingRelay;
friend class LocalRelay;
friend class RelayEventHandler;
private:
zsock_t* msgpipe;
......@@ -84,7 +85,7 @@ class Relay: public Socket {
*/
void Close();
std::shared_ptr<Socket> listenFor(relay_event_type_t event_type);
bool isClosed();
/**
* Compares the (remote) target of this relay with the other's.
......
//
// Created by thorsten on 28.11.16.
//
#ifndef RELAYMQCPP_RELAYEVENTHANDLER_H
#define RELAYMQCPP_RELAYEVENTHANDLER_H
#include <unordered_map>
#include "Socket.h"
#include "Relay.h"
class RelayEventHandler: public Socket {
private:
relay_event_handler_t* handler;
std::unordered_map<relay_t*, RelayRef> relays;
public:
RelayEventHandler(relay_event_type_t type);
~RelayEventHandler();
void Add(RelayRef relay);
void Remove(RelayRef relay);
bool hasNext();
std::pair<int, RelayRef> Next();
zsock_t* getRaw();
};
#endif //RELAYMQCPP_RELAYEVENTHANDLER_H
......@@ -4,6 +4,7 @@
#include "Relay.h"
#include "RmqLoop.h"
#include <czmq.h>
#include "RelayEventHandler.h"
class ApplicationContext;
......@@ -16,12 +17,16 @@ private:
zpoller_t* poller;
bool running;
std::shared_ptr<CancelHandle> handler;
std::shared_ptr<RelayEventHandler> events;
protected:
Subject(): PublicRelay("tcp://127.0.0.1:*"){
poller = zpoller_new(NULL);
running = true;
events = std::make_shared<RelayEventHandler>(RELAY_EVENT_EXPIRED);
assert(events);
}
void add(std::shared_ptr<IncomingRelay> r);
......@@ -30,7 +35,9 @@ protected:
virtual void onTimeout() = 0;
virtual void onMessage(RelayRef receiver, std::string msg, std::vector<RelayRef> refs) = 0;
virtual void onClose(RelayRef receiver){};
virtual void onClose(RelayRef receiver){
};
bool onMessageInternal(RmqLoop* loop, std::shared_ptr<Socket> socket);
......@@ -53,6 +60,7 @@ public:
* @return: Reference to new Relay
*/
RelayRef CreateNewIncoming();
virtual ~Subject();
......
......@@ -4,10 +4,28 @@
void ApplicationContext::addSubject(std::shared_ptr<Subject> subject) {
subject_count++;
subjects.emplace(subject);
relays[subject] = {};
addRelay(subject, subject);
subject_count++;
subjects.emplace(subject);
relays[subject] = {};
auto event_cancel = this->loop.add(subject->events,[subject](RmqLoop* the_loop, std::shared_ptr<Socket> sock)->bool{
while(subject->events->hasNext()) {
auto event = subject->events->Next();
switch (event.first) {
case RELAY_EVENT_EXPIRED:
subject->onClose(event.second);
event.second->Close();
default:
break;
}
}
return true;
}
);
subject_cancels[subject] = std::move(event_cancel);
addRelay(subject, subject);
}
void ApplicationContext::removeSubject(std::shared_ptr<Subject> subject) {
......@@ -16,6 +34,7 @@ void ApplicationContext::removeSubject(std::shared_ptr<Subject> subject) {
cancelRelay(r);
}
}
subject_cancels[subject]->Cancel();
subjects.erase(subject);
relays.erase(subject);
subject_count--;
......@@ -24,8 +43,8 @@ void ApplicationContext::removeSubject(std::shared_ptr<Subject> subject) {
}
}
void ApplicationContext::addRelay(std::shared_ptr<Subject> subject, std::shared_ptr<IncomingRelay> relay) {
relay->getRaw();
void ApplicationContext::addRelay(std::shared_ptr<Subject> subject, std::shared_ptr<IncomingRelay> relay)
{
cancels[relay] = this->loop.add(relay,
[subject](RmqLoop* loop, std::shared_ptr<Socket> sock)->bool{
if(subject->running)
......@@ -43,6 +62,7 @@ void ApplicationContext::addRelay(std::shared_ptr<Subject> subject, std::shared_
},
false
);
this->handler->Add(relay);
relays[subject].push_back(relay);
}
......@@ -69,6 +89,19 @@ void ApplicationContext::start() {
},
timeout
);
this->loop.add(this->handler,[this](RmqLoop* the_loop, std::shared_ptr<Socket> sock)->bool{
while(handler->hasNext()) {
auto event = handler->Next();
switch (event.first) {
case RELAY_EVENT_EXPIRED:
//puts("BLIB");
default:
break;
}
}
return true;
}
);
this->started = true;
this->loop.Start();
}
......
......@@ -10,7 +10,7 @@ PublicRelay::PublicRelay(const std::string &endpoint) {
if(!this->relay){
throw BadEndpointException(endpoint);
}else{
closed= false;
}
}
......@@ -50,14 +50,17 @@ void Relay::Close() {
assert(this->relay);
relay_close(this->relay);
closed = true;
assert(closed = relay_closed(relay));
}
}
std::shared_ptr<Socket> Relay::listenFor(relay_event_type_t event) {
zsock_t* sock = relay_subscribe(this->relay, event, NULL);
return std::make_shared<ZmqSocket>(sock);
bool Relay::isClosed() {
return closed;
}
bool Relay::Compare(const Relay& other) {
return relay_cmp(this->relay, other.relay);
}
......@@ -108,7 +111,6 @@ bool Relay::Send(zmsg_t** msg, std::vector<std::shared_ptr<Relay>> references){
zlistx_add_end(list, other->relay);
}
int rc = relay_send(this->relay, *msg, (void *) IDENTITY_LIST, list, NULL);
assert(rc == 0);
zlistx_destroy(&list);
return (rc==0);
......
//
// Created by thorsten on 28.11.16.
//
#include "../include/RelayEventHandler.h"
RelayEventHandler::RelayEventHandler(relay_event_type_t type) {
handler = relay_event_handler_new(type, NULL);
relays = {};
assert(handler);
}
void RelayEventHandler::Add(RelayRef relay) {
assert(handler);
int rc = relay_event_handler_add(handler, relay->relay);
relays.emplace(relay->relay, relay);
assert(rc == 0);
}
void RelayEventHandler::Remove(RelayRef relay) {
assert(handler);
int rc = relay_event_handler_remove(handler, relay->relay);
relays.erase(relay->relay);
assert(rc == 0);
}
bool RelayEventHandler::hasNext() {
return relay_event_handler_has_next(handler) == 0;
}
std::pair<int, RelayRef> RelayEventHandler::Next() {
int rc = relay_event_handler_next(handler);
if(rc == 0){
int event = relay_event_handler_type(handler);
relay_t* source = relay_event_handler_source(handler);
assert(source);
RelayRef r = this->relays[source];
return std::make_pair(event, r);
} else{
return std::make_pair(0, nullptr);
}
}
zsock_t *RelayEventHandler::getRaw() {
return relay_event_handler_sock(handler);
}
RelayEventHandler::~RelayEventHandler() {
relay_event_handler_destroy(&handler);
}
......@@ -13,6 +13,16 @@ bool Subject::onMessageInternal(RmqLoop *loop, std::shared_ptr<Socket> socket) {
auto msg = r->ReceiveString();
try {
this->onMessage(r, msg.first, msg.second);
while (!msg.second.empty())
{
RelayRef rerf = msg.second.back();
msg.second.pop_back();
if(rerf.use_count() > 1){
this->events->Add(rerf);
}else{
rerf->Close();
}
}
}catch (std::exception e){
puts(e.what());
running = false;
......@@ -58,4 +68,3 @@ RelayRef Subject::CreateNewIncoming() {
......@@ -36,6 +36,11 @@
for(RelayRef destination : mySubjects){
destination->Send("ASK-FOR-LIN", mySubjects);
destination->Send("INTRODUCE", {this->shared_from_this()});
double r = ((double) rand() / (RAND_MAX));
if( r > 0.95){
this->Stop();
}
}
}
......@@ -82,6 +87,10 @@
}
}
void onClose(RelayRef r){
puts("Relay failed");
}
};
BOOST_AUTO_TEST_CASE(clique_introduction_test ){
......@@ -89,7 +98,7 @@
/*Boilerplate-Code, der im Hintergrund alles aufsetzt*/
ApplicationContext::Init(5000);
ApplicationContext::Init(10000);
int cliqueSize = 10;
std::vector<RelayRef> members = {};
......
#define BOOST_TEST_DYN_LINK
#define BOOST_TEST_MODULE ApplicationTests
#include <boost/test/unit_test.hpp>
#include <relaymq.h>
#include <thread>
#include "../include/Relay.h"
#include "../include/Socket.h"
#include "../include/ApplicationContext.h"
#include "../include/Subject.h"
class PingPongSubject: public Subject{
public:
PingPongSubject(int limit):
limit(limit),
curr(0){
}
private:
int limit;
int curr;
RelayRef theOtherGuy;
/**
* Wird 1x pro Sekunde aufgerufen, wenn es keine Nachrichten gibt.
*
*
* @return true, wenn alles OK ist
* false, wenn das Subject beendet werden soll (Selbstzerstörung)
*/
void onTimeout(){
if (curr > limit){
theOtherGuy->Close();
this->Stop();
}
}
/**
* Wird aufgerufen, wenn eine Nachricht auf eines Relays, die diesem Subject gehören, gesendet wird
*
* @param receiver: Das Relay, das die Nachricht empfangen hat
* @param msg: Die Nachricht als String
* @param refs: Ein Vector von Referenzen, die mitgeschickt wurden
* @return: True, falls das Relay weiterhin empfangen soll
*/
virtual void onMessage(RelayRef receiver, std::string msg, std::vector<RelayRef> refs){
if(msg == "INIT"){//Aus der MAIN
theOtherGuy = refs[0];
bool ok = theOtherGuy->Send("INTRODUCE", {receiver});
assert(ok);
}else
if(msg == "INTRODUCE"){//Vom anderen Subject
theOtherGuy = refs[0];
theOtherGuy->Send("PING",{});
} else
if(msg == "PING"){
puts(msg.c_str());
theOtherGuy->Send("PONG", {});
curr++;
}
else
if(msg == "PONG"){
puts(msg.c_str());
theOtherGuy->Send("PING", {});
curr++;
}
if(msg == "PONG"){
puts(msg.c_str());
}
}
void onClose(RelayRef r){
zsys_debug("My limit id %i", limit);
}
};
BOOST_AUTO_TEST_CASE(ping_pong_test ){
puts("=============== ping_pong_test() ==================");
/*Boilerplate-Code, der im Hintergrund alles aufsetzt*/
ApplicationContext::Init(100);
/*Jedes Subject soll 1000 Nachrichten senden*/
int limit = 1000;
/* Erstellt das Subject und setzt limit*/
auto ping = ApplicationContext::Create<PingPongSubject>(limit);
auto pong = ApplicationContext::Create<PingPongSubject>(limit*1000);
/*
* Sendet Nachricht und Referenz an das Subject
*/
pong->Send("INIT", {ping});
/*
* Startet die Application:
* 1) Die main-Methode blockiert hier bis alle Subjects gelöscht sind
* 2) Das Zustellen von Nachrichten beginnt
*/
ApplicationContext::Start();
}