DEV Community

Discussion on: Quarks Replication for a scalable solution

Collapse
 
lucpattyn profile image
Mukit, Ataul

zeromq to the rescue with a very simple PUB SUB proxy :
stackoverflow.com/questions/425746...

Collapse
 
lucpattyn profile image
Mukit, Ataul

//
// Simple message queuing broker in C++
// Same as request-reply broker but using QUEUE device
//

include "zhelpers.hpp"

int main (int argc, char *argv[])
{
zmq::context_t context(1);

//  Socket facing clients
zmq::socket_t frontend (context, ZMQ_ROUTER);
frontend.bind("tcp://*:5559");

//  Socket facing services
zmq::socket_t backend (context, ZMQ_DEALER);
backend.bind("tcp://*:5560");

//  Start the proxy
zmq::proxy(static_cast<void*>(frontend),
           static_cast<void*>(backend),
           nullptr);
return 0;
Enter fullscreen mode Exit fullscreen mode

}

Collapse
 
lucpattyn profile image
Mukit, Ataul

// Broker

include

int main(int argc, char* argv[]) {

void* ctx = zmq_ctx_new();
assert(ctx);

void* frontend = zmq_socket(ctx, ZMQ_XSUB);
assert(frontend);
void* backend = zmq_socket(ctx, ZMQ_XPUB);
assert(backend);

int rc = zmq_bind(frontend, "tcp://:5570");
assert(rc==0);
rc = zmq_bind(backend, "tcp://
:5571");
assert(rc==0);

zmq_proxy_steerable(frontend, backend, nullptr, nullptr);

zmq_close(frontend);
zmq_close(backend);

rc = zmq_ctx_term(ctx);
return 0;
}

// PUB

include

include

using namespace std;
using namespace chrono;

int main(int argc, char* argv[])
{
void* context = zmq_ctx_new();
assert (context);
/* Create a ZMQ_SUB socket /
void *socket = zmq_socket (context, ZMQ_PUB);
assert (socket);
/
Connect it to the host

localhost, port 5571 using a TCP transport */
int rc = zmq_connect (socket, "tcp://localhost:5570");
assert (rc == 0);

while (true)
{
int len = zmq_send(socket, "hello", 5, 0);
cout << "pub len = " << len << endl;
this_thread::sleep_for(milliseconds(1000));
}
}

// SUB

include

include

using namespace std;

int main(int argc, char* argv[])
{
void* context = zmq_ctx_new();
assert (context);
/* Create a ZMQ_SUB socket /
void *socket = zmq_socket (context, ZMQ_SUB);
assert (socket);
/
Connect it to the host localhost, port 5571 using a TCP transport */
int rc = zmq_connect (socket, "tcp://localhost:5571");
assert (rc == 0);
rc = zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);

while (true)
{
char buffer[1024] = {0};
int len = zmq_recv(socket, buffer, sizeof(buffer), 0);
cout << "len = " << len << endl;
cout << "buffer = " << buffer << endl;
}
}