debug_pub = ctx.socket(zmq.PUB)
debug_pub.bind("tcp://127.0.0.1:%s" % debug_port)
zmq.proxy(xpub, xsub, debug_pub)
def send_message(pub, msg):
while True:
pub.publish(msg)
time.sleep(.1)
t = Thread(target=run_proxy, args=(ctx,), daemon=True)
t.start()
time.sleep(1)
test_sub = ctx.socket(zmq.SUB)
test_sub.connect("tcp://127.0.0.1:%s" % sub_port)
test_sub.setsockopt(zmq.SUBSCRIBE, b"")
publisher = Publisher("tcp://127.0.0.1:%s" % port, 'TestPub', ctx=ctx)
publisher.start()
name, topic, data = 'TestNode', 'testing', ['Raw', 'this', 'is', 'data']
msg = Envelope(topic, name, data)
sender_t = Thread(target=send_message, args=(publisher, msg), daemon=True)
sender_t.start()
i = 0
while i < 10:
try:
frames = test_sub.recv_multipart(zmq.NOBLOCK)
except zmq.error.Again:
frames = []