Programmer's Blog

Programmer's reference

Category Archives: ZeroMQ

[ZeroMQ] Client Server C++ example without middleman

 

fig55

The Freelance Model Two: Brutal Shotgun Massacre

  • We set things up, connecting to all servers.
  • When we have a request, we blast it out as many times as we have servers.
  • We wait for the first reply, and take that.
  • We ignore any other replies

Server Program

//-----------------ZMQ_CPP_FREELANCE2_SERVER---------------
#include <zmq.hpp>
#include <zmsg.hpp>
#include <iostream>

int main(int argc, char *argv[])
{
    if (argc < 2) {
        std::cout << "Error: syntax should be " << argv[0] <<
        " <endpoint> \n";
        return 0;
    }

    zmq::context_t * ctx = new zmq::context_t(1);
    zmq::socket_t * server = new zmq::socket_t(*ctx, ZMQ_REP);
    server->bind(argv[1]);

    std::cout << "Service is ready at " << argv[1] << std::endl;
    while (1) {
        zmsg *msg = new zmsg(*server);
        if (!msg)
        break;

        assert(msg->parts() == 2);

        std::string identity = (char *) (*msg).pop_front().c_str();
        std::string sequence = (char *) (*msg).pop_front().c_str();

        zmsg *reply = new zmsg();
        reply->push_front(const_cast<char*>(identity.c_str()));
        reply->push_front(const_cast<char*>(sequence.c_str()));
        reply->send(*server);
    }

    if (s_interrupted)
        std::cout << "Interrupted \n";

        ctx->close();
    return 0;
}

Client Program

//---------------ZMQ_CPP_FREELANCE2_CLIENT------------
#include <zmq.hpp>
#include <zmsg.hpp>
#include <zhelpers.hpp>
#include <iostream>

#define GLOBAL_TIMEOUT 2500

typedef struct _flclient_t flclient_t;
flclient_t *flclient_new(void);
void flclient_destroy(flclient_t ** self_p);
void flclient_connect(flclient_t * self, char *endpoint);
zmsg *flclient_request(flclient_t * self, zmsg ** request_p);

struct _flclient_t {
    zmq::context_t * ctx;
    zmq::socket_t * socket;
    size_t servers;
    uint sequence;
};

flclient_t *flclient_new(void)
{
    flclient_t *self;
    self = (flclient_t *) calloc(1, sizeof(flclient_t));
    self->ctx = new zmq::context_t(1);
    self->socket = new zmq::socket_t(*(self->ctx), ZMQ_DEALER);
    return self;
}

void flclient_destroy(flclient_t ** self_p)
{
    assert(self_p);
    if (*self_p) {
        flclient_t *self = *self_p;
        (self->ctx)->close();
        free(self);
        *self_p = NULL;
    }
}

void flclient_connect(flclient_t * self, char *endpoint)
{
    assert(self);
    (*(self->socket)).connect(endpoint);
    self->servers++;
}

zmsg *flclient_request(flclient_t * self, zmsg ** request_p)
{
    int debug = 0;
    assert(self);
    assert(*request_p);
    zmsg *request = *request_p;

    char sequence_text[10];
    sprintf(sequence_text, "%u", ++self->sequence);
    request->push_front(sequence_text);
    request->push_front("");

    int server;
    for (server = 0; server < self->servers; server++) {
         zmsg *msg = new zmsg(*request);
         msg->send(*(self->socket));
    }

    zmsg *reply = NULL;
    std::string part1, part2, part3;
    part1.clear();
    part2.clear();
    part3.clear();
    uint64_t endtime = s_clock() + GLOBAL_TIMEOUT;
    while (s_clock() < endtime) {
        std::cout.clear();
        zmq::pollitem_t items[] = {
{
        *(self->socket), 0, ZMQ_POLLIN, 0}
};
    int rc = zmq::poll(items, 1, (endtime - s_clock()) * 1000);
    if (rc == -1)
            break;
    std::cout.clear();
    if (items[0].revents & ZMQ_POLLIN) {
        reply = new zmsg(*(self->socket));
        assert(reply->parts() == 3);

        part1 = (char *) reply->pop_front().c_str();
        part2 = (char *) reply->pop_front().c_str();
        part3 = (char *) reply->pop_front().c_str();

        int sequence_nbr = atoi(part3.c_str());

        if (sequence_nbr == self->sequence)
            break;
        reply->clear();
}
    }

    (**request_p).clear();
    return reply;
}

int main(int argc, char *argv[])
{
    if (argc == 1) {
        std::cout << "Error: syntax should be " << argv[0] <<
        " <endpoint>... \n";
        return 0;
    }

    flclient_t *client = flclient_new();
    int argn;
    for (argn = 1; argn < argc; argn++)
        flclient_connect(client, argv[argn]);

        int requests = 10000;
        uint64_t start = s_clock();
        while (requests--) {
            zmsg *request = new zmsg();
            request->push_front("random name");
            zmsg *reply = flclient_request(client, &request);
            if (!reply) {
                std::cout << "name service not availbale, aborting...\n";
                break;
            }
            reply->clear();
    }
    std::cout << "Average round trip cost: " << (int) (s_clock() -
        start) / 10 << " usec \n";

    flclient_destroy(&client);
    return 0;
}
Advertisements