添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

在众多消息队列中,最简单、灵活、快速的应该就是 nanomsg 吧, nanomsg 是 zeromq 作者重新用 C 语言重新实现的, 是对 zeromq 的经验教训的各种提炼和反思。

We add power by removing complexity rather than exposing new functionality.

-- zeromq doc

nanomq 提供了 6 种通信模式,也即所谓“扩展性协议”:

  • pipeline
  • reqrep
  • pubsub
  • survey
  • 这些扩展性协议是建立在传输层之上的,现在 nanomsg 提供了 3 种传输机制:

  • inproc(单进程内通信)
  • ipc(单机内多进程通信)
  • tcp(通过 tcp 协议的网络通信)
  • nanomsg 的所有操作都是基于不同类型的 Socket,而 Socket 的类型决定了 nanomsg 使用了哪种通信模式和传输机制。

    以下示例用 python 语言编写。

    安装 nanomsg

    sudo pacman -S nanomsg
    

    Pipeline(A One-Way Pipe)

    文件 pipeline.py 如下:

    import nnpy
    def producer(url, msg):
        sock = nnpy.Socket(nnpy.AF_SP, nnpy.PUSH)
        sock.bind(url)
        print(f'[Producer]: send "{msg}"')
        sock.send(msg)
    def consumer(url):
        sock = nnpy.Socket(nnpy.AF_SP, nnpy.PULL)
        sock.connect(url)
        while True:
            msg = sock.recv()
            print(f'[Consumer]: receive "{msg}"')
    if __name__ == '__main__':
        import sys
        assert len(sys.argv) >= 2
        if sys.argv[1] == 'producer':
            assert len(sys.argv) == 4
            producer(sys.argv[2], sys.argv[3])
        elif sys.argv[1] == 'consumer':
            assert len(sys.argv) == 3
            consumer(sys.argv[2])
        else:
            print('''Usage:
            python pipeline.py producer <url> <msg>
            python pipeline.py consumer <url>''')
    

    终端测试:

    # 终端1
    python pipeline.py consumer 'ipc:///tmp/pipeline.ipc'
    # 终端2
    python pipeline.py producer 'ipc:///tmp/pipeline.ipc' 'hello world'
    

    Request/Reply (I ask, you answer)

    文件 reqrep.py 如下:

    import nnpy
    def server(url):
        sock = nnpy.Socket(nnpy.AF_SP, nnpy.REP)
        sock.bind(url)
        while True:
            msg = sock.recv()
            print(f'[Server]: received "{msg}"')
            sock.send(msg)
            print(f'[Server]: sent "{msg}"')
    def client(url, msg):
        sock = nnpy.Socket(nnpy.AF_SP, nnpy.REQ)
        sock.connect(url)
        print(f'[Client]: send "{msg}"')
        sock.send(msg)
        msg = sock.recv()
        print(f'[Client]: received "{msg}"')
    if __name__ == '__main__':
        import sys
        assert len(sys.argv) >= 2
        if sys.argv[1] == 'server':
            assert len(sys.argv) == 3
            server(sys.argv[2])
        elif sys.argv[1] == 'client':
            assert len(sys.argv) == 4
            client(sys.argv[2], sys.argv[3])
        else:
            print('''Usage:
            python reqrep.py server <url>
            python reqrep.py client <url> <msg>''')
    

    终端测试:

    # 终端1
    python reqrep.py server 'ipc:///tmp/reqrep.ipc'
    # 终端2
    python reqrep.py client 'ipc:///tmp/reqrep.ipc' 'hello world'
    

    Pair (Two Way Radio)

    文件 pair.py 如下:

    import time
    import nnpy
    def node1(url):
        sock = nnpy.Socket(nnpy.AF_SP, nnpy.PAIR)
        sock.bind(url)
        name = 'Node1'
        while True:
            msg = sock.recv()
            print(f'[{name}] received {msg}')
            time.sleep(1)
            print(f'[{name}] send {name}')
            sock.send(name)
    def node2(url):
        sock = nnpy.Socket(nnpy.AF_SP, nnpy.PAIR)
        sock.connect(url)
        name = 'Node2'
        while True:
            print(f'[{name}] send {name}')
            sock.send(name)
            time.sleep(1)
            msg = sock.recv()
            print(f'[{name}] received {msg}')
    if __name__ == '__main__':
        import sys
        assert len(sys.argv) == 3
        if sys.argv[1] == 'node1':
            node1(sys.argv[2])
        elif sys.argv[1] == 'node2':
            node2(sys.argv[2])
        else:
            print('''Usage:
            python pair.py node1 <url>
            python pair.py node1 <url>''')
    

    终端测试:

    # 终端1
    python pair.py node1 'ipc:///tmp/pair.ipc'
    # 终端2
    python pair.py node2 'ipc:///tmp/pair.ipc' 'hello world'
    

    Pub/Sub (Topics & Broadcast)

    文件 pubsub.py 如下:

    import time
    import nnpy
    def publisher(url):
        sock = nnpy.Socket(nnpy.AF_SP, nnpy.PUB)
        sock.bind(url)
        while True:
            msg = str(time.time())
            print(f'[Publisher] publish {msg}')
            sock.send(msg)
            time.sleep(1)
    def subscriber(url, name):
        sock = nnpy.Socket(nnpy.AF_SP, nnpy.SUB)
        sock.setsockopt(nnpy.SUB, nnpy.SUB_SUBSCRIBE, "")
        sock.connect(url)
        while True:
            msg = sock.recv()
            print(f'[{name}] received {msg}')
    if __name__ == '__main__':
        import sys
        assert len(sys.argv) == 3
        if sys.argv[1] == 'publisher':
            publisher(sys.argv[2])
        elif sys.argv[1].startswith('subscriber'):
            subscriber(sys.argv[2], sys.argv[1])
        else:
            print('''Usage:
            python pubsub.py publisher <url>
            python pubsub.py subscriber<n> <url>''')
    

    终端测试:

    # 终端1
    python pubsub.py publisher 'ipc:///tmp/pubsub.ipc'
    # 终端2
    python pubsub.py subscriber1 'ipc:///tmp/pubsub.ipc'
    # 终端3
    python pubsub.py subscriber2 'ipc:///tmp/pubsub.ipc'
    # 终端4
    python pubsub.py subscriber3 'ipc:///tmp/pubsub.ipc'
    

    Survey (Everybody Votes)

    文件 survey.py 如下:

    import time
    import nnpy
    def surveyor(url):
        sock = nnpy.Socket(nnpy.AF_SP, nnpy.SURVEYOR)
        sock.bind(url)
        time.sleep(1)
        msg = str(time.time())
        print(f'[Surveyor] request {msg}')
        sock.send(msg)
        while True:
                msg = sock.recv()
                print(f'[Surveyor] receive {msg}')
            except nnpy.errors.NNError:
                break
    def respondent(url, name):
        sock = nnpy.Socket(nnpy.AF_SP, nnpy.RESPONDENT)
        sock.connect(url)
        while True:
            msg = sock.recv()
            print(f'[{name}] received {msg}')
            print(f'[{name}] response {msg}')
            sock.send(msg)
    if __name__ == '__main__':
        import sys
        assert len(sys.argv) == 3
        if sys.argv[1] == 'surveyor':
            surveyor(sys.argv[2])
        elif sys.argv[1].startswith('respondent'):
            respondent(sys.argv[2], sys.argv[1])
        else:
            print('''Usage:
            python survey.py surveyor <url>
            python survey.py respondent<n> <url>''')
    

    终端测试:

    # 终端1
    python survey.py respondent1 'ipc:///tmp/survey.ipc'
    # 终端2
    python survey.py respondent2 'ipc:///tmp/survey.ipc'
    # 终端3
    python survey.py respondent3 'ipc:///tmp/survey.ipc'
    # 终端4
    python survey.py surveyor 'ipc:///tmp/survey.ipc'
    

    Bus (Routing)

    文件 bus.py 如下:

    import time
    import nnpy
    def node(url, name, *other_urls):
        sock = nnpy.Socket(nnpy.AF_SP, nnpy.BUS)
        sock.bind(url)
        for url in other_urls:
            sock.connect(url)
        time.sleep(5)
        sock.setsockopt(nnpy.SOL_SOCKET, nnpy.RCVTIMEO, 5000)
        print(f'[{name}] send {name} onto bus')
        sock.send(name)
        while True:
                msg = sock.recv()
                print(f'[{name}] receive {msg} from bus')
            except nnpy.errors.NNError:
                break
    if __name__ == '__main__':
        import sys
        assert len(sys.argv) >= 4
        if sys.argv[1].startswith('node'):
            node(sys.argv[2], sys.argv[1], *sys.argv[3:])
        else:
            print('''Usage:
            python bus.py node<n> <url> <url>...''')
    

    终端测试:

    # 终端1
    python bus.py node1 'ipc:///tmp/node1.ipc' \
        'ipc:///tmp/node2.ipc' \
        'ipc:///tmp/node3.ipc' \
        'ipc:///tmp/node4.ipc'
    # 终端2
    python bus.py node2 'ipc:///tmp/node2.ipc' \
        'ipc:///tmp/node1.ipc' \
    	'ipc:///tmp/node3.ipc' \
    	'ipc:///tmp/node4.ipc'
    # 终端3
    python bus.py node3 'ipc:///tmp/node3.ipc' \
        'ipc:///tmp/node1.ipc' \
    	'ipc:///tmp/node2.ipc' \
    	'ipc:///tmp/node4.ipc'
    # 终端4
    python bus.py node4 'ipc:///tmp/node4.ipc' \
        'ipc:///tmp/node1.ipc' \
    	'ipc:///tmp/node2.ipc' \
    	'ipc:///tmp/node3.ipc'