@Override public List<String> call() throws Exception { try (ZMQ.Socket subscriber = context.socket(ZMQ.SUB)) { subscriber.connect(endpoint); subscriber.subscribe(new byte[0]); for (int messageNum = 0; messageNum < receiveCount && !Thread.currentThread().isInterrupted(); messageNum++) { // Use trim to remove the tailing '0' character messages.add(subscriber.recvStr(0).trim()); return messages;
socket.subscribe(topic);
public Closeable subscribe(final String topic) {
final byte[] topicBytes = topic.getBytes();
subscriber.subscribe(topicBytes);
return new Closeable() {
@Override
public void close() throws IOException {
subscriber.unsubscribe(topicBytes);
subscriber.subscribe(topic.getBytes());
subscriber.subscribe("FAIL".getBytes("UTF-8"));
subscriber.subscribe("".getBytes());
subscriber.connect("tcp://localhost:5562");
sync.connect("tcp://localhost:5561");
/**
* Closes an existing socket and creates a new one, binding it to the list of active publisher endpoints.
private synchronized void reconnect() {
if (!doRefresh.getAndSet(false)) {
return;
closeSocket();
manager.reserve(socketId);
Context context = manager.getContext();
socket = context.socket(ZMQ.SUB);
SocketHelper.configure(socket, metadata);
socket.subscribe(EMPTY_BYTES); // receive all messages
for (SocketAddress address : addresses) {
socket.connect(address.toProtocolString());
// establish a socket for receiving control messages
controlSocket = manager.createControlSocket();
poller = context.poller();
poller.register(controlSocket, ZMQ.Poller.POLLIN);
poller.register(socket, ZMQ.Poller.POLLIN);
/**
* Closes an existing socket and creates a new one, binding it to the list of active publisher endpoints.
private synchronized void reconnect() {
if (!doRefresh.getAndSet(false)) {
return;
closeSocket();
manager.reserve(socketId);
Context context = manager.getContext();
socket = context.socket(ZMQ.SUB);
SocketHelper.configure(socket, metadata);
socket.subscribe(EMPTY_BYTES); // receive all messages
for (SocketAddress address : addresses) {
socket.connect(address.toProtocolString());
// establish a socket for receiving control messages
controlSocket = manager.createControlSocket();
poller = context.poller();
poller.register(controlSocket, ZMQ.Poller.POLLIN);
poller.register(socket, ZMQ.Poller.POLLIN);
/** * @param args public static void main(String args[]) { // Prepare our context and subscriber ZMQ.Context context = ZMQ.context(1); ZMQ.Socket subscriber = context.socket(ZMQ.SUB); subscriber.connect("tcp://localhost:5563"); subscriber.subscribe(new byte[0]); while (true) { ByteArrayInputStream stream = new ByteArrayInputStream(subscriber.recv(0)); WriteableImageOutput instance; try { instance = IOUtils.read(stream, WriteableImageOutput.class, "UTF-8"); System.out.println("Got URL: " + instance.url + " ( " + instance.stats.imageURLs + " ) "); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace();
@Override public boolean configure(Properties properties) { try { listeningSocket = ctx.newSocket(method, type, listen); listeningSocket.setImmediate(false); listeningSocket.setReceiveTimeOut(-1); listeningSocket.setHWM(hwm); if(type == Sockets.SUB){ listeningSocket.subscribe(new byte[] {}); boolean configured = super.configure(properties); stopPair = ctx.getPair(getName() + "/" + UUID.randomUUID()); return configured; } catch (org.zeromq.ZMQException e) { ZMQHelper.logZMQException(logger, "failed to start ZMQ input " + listen + ":", e); logger.catching(Level.DEBUG, e.getCause()); listeningSocket = null; return false;
/**
* Connect to remote ZMQ.PUB
* @param bind tcp://<ip>:<port>
* @throws ru.darklogic.jericho.common.BindFormatException
public void connect(String bind) throws BindFormatException {
System.out.print("Trying to connect to " + bind +" ");
if (! isBindGood(bind)){
throw new BindFormatException();
ZMQ.Context context = ZMQ.context(1);
sub = context.socket(ZMQ.SUB);
sub.connect(bind);
sub.subscribe(new byte[0]);
System.out.println("Done");
public void subscribe(String topic) {
subscriber.subscribe(topic.getBytes());
import org.zeromq.ZMQ; public class Subscriber { public static void main(String[] a){ final ZMQ.Context ctx = ZMQ.context(1); final ZMQ.Socket sub = ctx.socket(ZMQ.SUB); // sub.connect("tcp://localhost:6001"); sub.connect("ipc://001"); sub.subscribe("".getBytes()); while (true){ String msg = sub.recvStr(); System.out.println(msg);
@Override public void setup(OperatorContext ctx) context = ZMQ.context(1); subscriber = context.socket(ZMQ.SUB); subscriber.connect(url); subscriber.subscribe(filter.getBytes()); syncclient = context.socket(ZMQ.REQ); syncclient.connect(syncUrl); syncclient.send("".getBytes(), 0);
public ZMQ.Socket createControlSocket() {
ZMQ.Socket controlSocket = context.socket(ZMQ.SUB);
controlSocket.subscribe(EMPTY_BYTES);
controlSocket.connect("inproc://fabric3");
return controlSocket;
@Override public void setup(OperatorContext ctx) context = ZMQ.context(1); subscriber = context.socket(ZMQ.SUB); subscriber.connect(url); subscriber.subscribe(filter.getBytes()); syncclient = context.socket(ZMQ.REQ); syncclient.connect(syncUrl); syncclient.send("".getBytes(), 0);
public ZMQ.Socket createControlSocket() {
ZMQ.Socket controlSocket = context.socket(ZMQ.SUB);
controlSocket.setLinger(0);
controlSocket.subscribe(EMPTY_BYTES);
controlSocket.connect("inproc://fabric3");
return controlSocket;
public void setup()
context = ZMQ.context(1);
logger.debug("Subsribing on ZeroMQ");
subscriber = context.socket(ZMQ.SUB);
subscriber.connect("tcp://localhost:5556");
subscriber.subscribe("".getBytes());
syncclient = context.socket(ZMQ.REQ);
syncclient.connect("tcp://localhost:5557");
sendSync();
public static Socket subscribe(Socket socket, byte[] topic) {
socket.subscribe(topic);
return socket;