let publisher = zmq.socket('push');
publisher.setsockopt(zmq.ZMQ_SNDTIMEO, 1);
publisher.bindSync('tcp://*:5556');
publisher.send(message, null, (err) => {
if (err) {
console.log('Failed to send message (' + err + ')');
} else {
console.log('Message successfully sent via ZMQ. (res: ' + res + ')');
One thing you really need to understand about ZMQ is that it takes care of connections. Either it guarantees delivery (eventually), or it drops the message (based on socket types, and sometimes options). You will usually not receive an error in your callback saying "the message was dropped, feel free to retry".
The PUB socket simply publishes to whoever is listening (subscribers), and if nobody is listening, it just drops the message. This is by design.
The way you describe PUSH is also by design. If you don't trust the socket, my advice would be: don't use ZMQ. If you have a demonstrable issue with PUSH sockets that show they are broken (vis-à-vis their designed behavior), I'm sure the nice people at ZMQ would love to know, in which case do share!
Now there are some options you can use to tweak behaviors. For example, you can have a look at ZMQ_IMMEDIATE
. Read about all socket options here: http://api.zeromq.org/4-0:zmq-setsockopt
I hope this may help out somewhat. Good luck!
I will agree for the PUB/SUB pattern, no subscriber means no message shall be delivered.
I disagree however for the PUSH/PULL pattern, as in I have currently no control over buffering/memory, and thus cannot handle failure situations as needed by my requirements.
The most simple demonstrable issue will be if you just plain shut down your PULL socket, while the PUSH socket is still trying to send. What will happen is that messages are buffered in memory until the PULL socket is available again.
When it comes to a lot of traffic, this will - please correct me if I am wrong - result in noticeable memory growth, ultimately until the point of failure/crash. Depending on the timespan of the outage, this will have several issues:
Some requests may be so old, that they shall not be redelivered (because the client got its error-reply already)
Re-delivering all messages might lead to delays as the receiver side will have to process all of those (probably already obsolete) messages
The sending process might run out of memory and just crash/lose everything
I am not saying anything in the ZMQ lib is broken, however my situation will require appropriate handling of error situations which is currently not possible (and I am wondering if I am the only one in this regard? Are you saying ZMQ should not be used in production grade/enterprise environments?)
Of course only the application can tell what to do with undeliverable messages, only the application can have knowledge about this (are those fire&forget messages and I dont care about failed deliveries? Are those messages which must be guaranteed to be delivered exactly once? Are those messages which should be dropped after xxx unsuccessful deliveries? ... ) So it is nice that ZMQ is doing alot in the background and already handling a lot, the ultimate decision about what to do with failed deliveries however must be made within the application, and not in the transport layer library (in my humble opinion, that is).
As I see in the docs, the plain C library will have the ZMB_NOBLOCK flag, which would allow the PUSH socket to receive errors when the message can not be delivered. However, the node.js module will not allow me to use the flag, since it is already blocking in zmq_getsockopt
(binding.cc:1281).
As for the ZMQ_IMMEDIATE
flag (I yet have to check if it would help) it seems not to be in the constants defined in the node.js lib. Should I find out its integer value in the C libary headers and just supply the number value to setsockopt(), or is this flag unsupported?
As a sidenote: I am not bound to use the PUB/SUB or PUSH/PULL pattern. In fact I will try out REQ/RES and see how it goes when the receiver is unavailable. I am just trying to find the correct usage of the ZMQ lib for my requirements/scenario as described in the first post.
@patrickjane @lokhandeomkar @ronkorving How can I connect to multiple publishers at the same time to receive messages from both. In my code I m only able to receive the messages from the first publisher to which I connect.
Code:
#include <zmq.hpp>
#include
int main (int argc, char *argv[])
zmq::context_t context (1);
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5556");
subscriber.connect("tcp://localhost:5557");
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);//subscribe to all messages
// Process 10 updates
int update_nbr;
for (update_nbr = 0; update_nbr < 10 ; update_nbr++) {
zmq::message_t update;
subscriber.recv (&update);
//Prints only the data from publisher bound to port 5556
std::string updt = std::string(static_cast<char*>(update.data()), update.size());
std::cout << "Received Update/Messages/TaskList " << update_nbr <<" : "<< updt << std::endl;
return 0;