添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement . We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue description

In a production environment running Windows Server 2016, we have found a short-lived PUSH socket fails to send a single message most of the time. In most environments it is reliable and we've had a hard time reproducing, but I do have some test code now that shows a problem on both Windows 10 and MacOS Sierra. We were using ZMQ_LINGER set to 10000 and ZMQ_IMMEDIATE was not enabled. The socket is created, configured, the message sent non-blocking, and destructed immediately. The thread where this happens also ends very shortly after that. The context is a singleton and is not destructed while the process is running.

The message is unreliably delivered to the PULL socket. When it is not delivered we observe both cases where no TCP connection is ever made, as well as a cases where the server starts the handshake and the client suddenly aborts it with a TCP RST packet, as in this Wireshark trace:

The documentation of ZMQ_LINGER leads me to believe the message should be delivered despite closing the socket:

Positive values specify an upper bound for the linger period in milliseconds. Pending messages shall not be discarded after a call to zmq_close(); attempting to terminate the socket's context with zmq_term() shall block until either all pending messages have been sent to a peer, or the linger period expires, after which any pending messages shall be discarded.

The documentation of ZMQ_IMMEDIATE makes me wonder if this is expected behavior:

By default queues will fill on outgoing connections even if the connection has not completed. This can lead to "lost" messages on sockets with round-robin routing (REQ, PUSH, DEALER). If this option is set to 1, messages shall be queued only to completed connections. This will cause the socket to block if there are no other connections, but will prevent queues from filling on pipes awaiting connection.

Although the PUSH socket here is only connected to one PULL socket so it's not really round-robin. I have not found any documentation on the interaction between ZMQ_LINGER and ZMQ_IMMEDIATE .

Possibly Related Issues

Issue #1264 may be related in seeing dropped messages even with linger on a short-lived socket. JeroMQ Issue 624 seems to have the same TCP-level behavior we observe.

Environment

  • libzmq version (commit hash if unreleased): 4.3.2
  • OS: Windows 10, Windows Server 2016, MacOS Sierra
  • Compiler: Visual Studio 2015, clang/Xcode
  • Minimal test code / Steps to reproduce the issue

    The following test code reproduces the issue on both Windows and MacOS, although it fails more often on my Windows machine. There are defines to allow you control 1) blocking vs non-blocking, 2) enabled ZMQ_IMMEDIATE , 3) having the short-lived socket in a loop or in a thread, and 4) optional sleep between each send. This code assumes C++14 or higher.

    // #define BLOCKING
    // #define IMMEDIATE
    #define THREADED
    // #define SLEEP_EACH 20ms
    #include <atomic>
    #include <cassert>
    #include <chrono>
    #include <iostream>
    #include <memory>
    #include <thread>
    #include <vector>
    #include <zmq.h>
    int main()
      using namespace std::chrono_literals;
      using std::chrono::steady_clock;
      std::atomic<bool> stopped{false};
      std::atomic<steady_clock::time_point> lastReceived{steady_clock::now()};
      std::atomic<unsigned long long>
        messagesReceived{0ull},
        messagesSent{0ull};
      constexpr unsigned long long TOTAL_MESSAGES = 1'000;
      unsigned long long lastMessage = std::numeric_limits<unsigned long long>::max();
      std::vector<unsigned long long> messageNumbers;
      auto ctx = zmq_ctx_new();
      // pull socket reading numbers
      std::thread receiver([&]() {
        auto socket = zmq_socket(ctx, ZMQ_PULL);
        int opt = 100'000;
        zmq_setsockopt(socket, ZMQ_RCVHWM, &opt, sizeof opt);
        zmq_bind(socket, "tcp://127.0.0.1:20000");
        char buf[256];
        while (!stopped) {
          int len = zmq_recv(socket, &buf, sizeof buf, 0);
          if (len > 0) {
            buf[len] = '\0';
            int number = std::atoi(buf);
            messageNumbers.push_back(number);
            lastMessage = number;
            if (++messagesReceived == TOTAL_MESSAGES)
              stopped.store(true);
            lastReceived.store(steady_clock::now());
        zmq_close(socket);
      // time for socket to get ready
      std::this_thread::sleep_for(2s);
      int rc = 0;
      for (unsigned long long i = 0ull; i < TOTAL_MESSAGES && rc != -1; ++i)
    #ifdef THREADED
        std::thread t([&]()
    #endif
    #ifdef SLEEP_EACH
          std::this_thread::sleep_for(SLEEP_EACH);
    #endif
          auto socket = zmq_socket(ctx, ZMQ_PUSH);
          int opt = 10'000;
          zmq_setsockopt(socket, ZMQ_SNDTIMEO, &opt, sizeof opt);
          opt = 30'000;
          zmq_setsockopt(socket, ZMQ_LINGER, &opt, sizeof opt);
    #ifdef IMMEDIATE
          opt = 1;
          zmq_setsockopt(socket, ZMQ_IMMEDIATE, &opt, sizeof opt);
    #endif
          zmq_connect(socket, "tcp://127.0.0.1:20000");
          std::string msg = std::to_string(i);
    #ifdef BLOCKING
          int blocking = 0;
    #else
          int blocking = ZMQ_NOBLOCK;
    #endif
          rc = zmq_send(socket, msg.c_str(), msg.length(), blocking);
          if (rc == -1) {
            std::cerr << "Send failed: [" << zmq_errno() << "] " << zmq_strerror(zmq_errno()) << std::endl;
          assert(rc != -1);
          ++messagesSent;
          zmq_close(socket);
    #ifdef THREADED
        t.join();
    #endif
      while (!stopped && receiver.joinable()) {
        // if we haven't got a new message in 5 seconds, assume no more are coming
        if (steady_clock::now() - lastReceived.load() > 5s) {
          if (!stopped) {
            std::cerr << "No message in 5 seconds, stopping test early..." << std::endl;
          stopped.store(true);
        std::this_thread::sleep_for(100ms);
      std::cout << "Sent " << messagesSent.load() << ", read " << messagesReceived.load() << ", lost " << (messagesSent - messagesReceived);
      //assert(messagesReceived.load() == TOTAL_MESSAGES);
      zmq_term(ctx);
      return 0;
    

    What's the actual result? (include assertion message & call stack if applicable)

    Basically, non-immediate and non-sleeping tests drop a few messages often. Blocking and immediate never drop in the test case, but in the production environment blocking and immediate still drops unless we add something like std::cout after the send then it never drops (maybe due to a syscall occurring?). Using a thread makes the dropping worse but is not necessary to get some drops. Adding a 20ms sleep seems to fix the issue regardless of the other settings. Even a 1ms sleep seems to fix the issue (syscall?).

    I ran three runs of each of these configurations, here are the results for Windows 10 (1000 indicates a pass, anything lower is a failure with dropped messages):

    Blocking Immediate Threaded Sleep Received Messages (out of 1000)

    Note this is TCP transport but on the loopback so bad connectivity should not be possible. The behavior matches what I see if we have a 0 linger set. Cannot reproduce on inproc transport.

    A curious thing is the sleep fix is not between zmq_send and zmq_close, but before making each socket.

    I also just noticed Issue #1922 of which this is a possible duplicate, except this test reproduces on Mac and does not use a low HWM.