文章
问答
冒泡
ZeroMQ浅谈

ZeroMQ(也拼写为 ØMQ、0MQ 或 ZMQ)是一个高性能异步消息网络库,旨在用于分布式或并发程序中。它提供了一个消息队列,但不同于其他的面向消息的中间件,ZeroMQ 系统可以在没有专用的消息代理节点的情况下运行。

ZeroMQ支持多种通信模式,例如发布/订阅、请求/回复、客户端/服务器等,且也支持多种传输方式,例如TCP、进程内、进程间、广播、WebSocket 等。

ZeroMQ支持多种流行的编程语言集成使用。

Socket API

Sockets是一套网络编程的API,它并不是某种协议,它是基于TCP/IP的标准网络API,通常称作"套接字",Socket处于网络协议的传输层。ZeroMQ 在传统套接字之上提供了更高层次的抽象,它简化了通信模式和连接管理,提供了一套与套接字很相似的 API 。

ZeroMQ sockets生命周期

ZeroMQ 套接字的生命周期分为四个部分:

1)创建和销毁sockets

2)配置sockets,并在必要的时候执行检查

3)创建与sockets之间的 ZeroMQ 连接

4)在套接字上写入和接收消息,传输数据

队列消息数量上限

ZeroMQ对每种类型的套接字正在排队待处理的消息数量设置的一个最大上限。

当达到此消息数量上限时,套接字会进入静默状态(异常状态),ZeroMQ会根据套接字类型制定的策略,采取比如阻止或丢弃已发送的消息。

消息模式

Request-reply

请求-回复模式适用于各种面向服务的架构。

REQ socket

客户端使用REQ socket向服务端发送请求和接收响应。REQ socket仅支持交替的发送和后继接收调用的顺序。REQ socket可以连接REP socketROUTER socket

当服务不可用的时候,send操作会阻塞直到至少存在一个服务可用为止,REQ socket不会丢弃任何消息。

兼容的套接字(可连接的) REP,ROUTER
方向 双向
发送/接收模式 发送,接收,发送,接收,...
静默期间的动作 阻塞

REP socket

服务端使用REP socket接收请求并响应客户端。

兼容的套接字(可连接的) REQ,DEALER
方向 双向
发送/接收模式 接收,发送,接收,发送,...
静默期间的动作 上一个连接

DEALER socket

DEALER socket使用循环算法发送和接收消息,它是可靠的,不会丢弃消息。作为REQ的异步方案,用以与REP或ROUTER服务器进行通信的客户端。

DEALER socket连接到REP socket的时候,发送的消息必须包含一个空帧作为消息的第一部分(分隔符),后面再跟上一个或多个正文部分。

通常用于构建复杂的消息传递拓扑结构。

兼容的套接字(可连接的) ROUTER,REP,DEALER
方向 双向
发送/接收模式 无限制
静默期间的动作 阻塞

ROUTER socket

ROUTER socketREP socket的异步方案,通常用于与DEALER客户端进行交互通信。

兼容的套接字(可连接的) DEALER,REQ,ROUTER
方向 双向
发送/接收模式 无限制
静默期间的动作 丢弃

Pub-sub

发布-订阅模式是一种从单个发布者到多个订阅者的一对多数据分发。

Topic使用的前置匹配的模式,比如:当订阅了topic时,topictopic/subtopictopical都可以收到消息,但topiTOPIC收不到消息。

PUB socket

发布者使用PUB socket分发消息。这种类型的socket不能接收消息。

当此种类型的socket发送的消息超过了订阅者所能承受的上限时,后继再发送给此订阅者的消息都将被抛弃,直到订阅者恢复正常的处理能力。此种类型的socket的send()方法不会阻塞。

兼容的套接字(可连接的) SUB,XSUB
方向 单向
发送/接收模式 只能发送
静默期间的动作 丢弃

SUB socket

订阅者使用此种类型的socket订阅消息。

兼容的套接字(可连接的) PUB,XPUB
方向 单向
发送/接收模式 只能接收
静默期间的动作 丢弃

XPUB socket

此种socket类型可以以传入消息的形式接受来自对方的订阅,订阅(取消订阅)的消息是在订阅(取消订阅)topic前面增加订阅(取消订阅)标识:1(代表订阅),0(代表取消订阅),其他功能同PUB socket

兼容的套接字(可连接的) ZMQ_SUB,ZMQ_XSUB
方向 单向
发送/接收模式 发送消息,接受订阅
静默期间的动作 丢弃

XSUB socket

此种socket类型可以通过发送订阅消息来达到订阅或取消订阅的目的,订阅(取消订阅)的消息是在订阅(取消订阅)topic前面增加订阅(取消订阅)标识:1(代表订阅),0(代表取消订阅),其他功能同SUB socket

兼容的套接字(可连接的) ZMQ_PUB,ZMQ_XPUB
方向 单向
发送/接收模式 接收消息,发送订阅
静默期间的动作 丢弃

Pipeline

管道模式主要用于任务分发。比如一个或多个主节点向多个工作节点推送任务。工作节点将结果再推送给一个或多个主节点。这种模式在很大程度上是可靠的,因为他不会丢弃任何消息,除非某节点异常掉线了。节点可以随时加入。

PUSH socket

PUSH socket可以与多个PULL socket通信,但只是用于发送消息。在无可用节点连接的情况下,send()方法会进入阻塞状态。

在发送消息时使用循环算法,只会将消息发送给其中的某一个工作节点。

兼容的套接字(可连接的) PULL
方向 单向
发送/接收模式 发送
静默期间的动作 阻塞

PULL socket

PULL socket可以与多个PUSH socket通信,从这些socket中接收消息。

在接收消息时使用公平排队算法,顺序处理消息。

兼容的套接字(可连接的) PUSH
方向 单向
发送/接收模式 接收
静默期间的动作 阻塞

Exclusive pair

PAIR 不是通用套接字,用于两个稳定的节点进行通信。这通常限制 PAIR 只在单个进程中使用,用于线程间通信。

PAIR socket

PAIR socket只能同时存在一个连接。在PAIR socket进入静默状态的时候,send()方法会进入阻塞状态。

案例演示

使用Java版本的SDK,创建maven项目,引入对应的依赖:

<dependency>
   <groupId>org.zeromq</groupId>
   <artifactId>jeromq</artifactId>
   <version>0.6.0</version>
</dependency>

REQ和REP

服务端:

package com.justin.reqrep;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ.Socket;
import zmq.ZMQ;

public class RepSocketServer {
    public static void main(String[] args) throws InterruptedException {
        try (ZContext context = new ZContext()) {
            Socket socket = context.createSocket(SocketType.REP);
            socket.setSendTimeOut(5000);
            socket.bind("tcp://*:5555");

            while (!Thread.currentThread().isInterrupted()) {
                byte[] recv = socket.recv(0);
                System.out.println("received: " + new String(recv, ZMQ.CHARSET));

                String res = "world";
                socket.send(res.getBytes(ZMQ.CHARSET), 0);

                Thread.sleep(1000);
            }
        }
    }
}

客户端:

package com.justin.reqrep;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;

public class ReqSocketClient {
    public static void main(String[] args) {
        try (ZContext context = new ZContext()) {
            Socket socket = context.createSocket(SocketType.REQ);
            socket.setSendTimeOut(5000);
            socket.connect("tcp://localhost:5555");

            for (int i = 0;i != 100;i++) {
                String req = "Hello";
                System.out.println("Sending Hello " + i);
                socket.send(req.getBytes(ZMQ.CHARSET), 0);

                byte[] recv = socket.recv(0);
                System.out.println("received: " + new String(recv, ZMQ.CHARSET));
            }
        }
    }
}

PUB和SUB

package com.justin.pubsub;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ.Socket;
import zmq.ZMQ;

public class PubSocketServer {
    public static void main(String[] args) throws InterruptedException {
        try (ZContext context = new ZContext()) {
            Socket socket = context.createSocket(SocketType.PUB);
            socket.bind("tcp://*:5555");

            int i = 0;
            while (i <= 10) {
                String res = "status world";
                socket.send(res.getBytes(ZMQ.CHARSET), 0);

                i++;

                Thread.sleep(1000);
            }
        }
    }
}

 

package com.justin.pubsub;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;

import java.util.Objects;

public class SubSocketClient {
    public static void main(String[] args) throws InterruptedException {
        try (ZContext context = new ZContext()) {
            Socket socket = context.createSocket(SocketType.SUB);
            socket.connect("tcp://localhost:5555");

            socket.subscribe("status");

            while (true) {
                Thread.sleep(1000);

                byte[] recv = socket.recv(0);
                if (Objects.isNull(recv)) {
                    continue;
                }

                System.out.println("received: " + new String(recv, ZMQ.CHARSET));
            }
        }
    }
}

XPUB和XSUB

代理模式

代理配置

package com.justin.xpubxsubproxy;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ.Socket;
import zmq.ZMQ;

public class Proxy {
    /**
     * 代理模式,充当发布者和订阅者之间的代理
     */
    public static void method1() {
        try (ZContext context = new ZContext()) {
            Socket xpubSocket = context.createSocket(SocketType.XPUB);
            xpubSocket.bind("tcp://*:5555");

            Socket xsubSocket = context.createSocket(SocketType.XSUB);
            xsubSocket.bind("tcp://*:5556");

            ZMQ.proxy(xsubSocket.base(), xpubSocket.base(), null);

            xpubSocket.close();
            xsubSocket.close();
        }
    }

    public static void main(String[] args) {
        method1();
    }
}

发布者

package com.justin.xpubxsubproxy;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ.Socket;
import zmq.ZMQ;

public class XPubSocketDemo {
    public static void main(String[] args) throws InterruptedException {
        try (ZContext context = new ZContext()) {
            Socket socket = context.createSocket(SocketType.PUB);
            socket.connect("tcp://localhost:5556");

            int i = 0;
            while (i <= 10) {
                String res = "status hello world";
                socket.send(res.getBytes(ZMQ.CHARSET), 0);

                i++;

                Thread.sleep(1000);
            }
        }
    }
}

订阅者

package com.justin.xpubxsubproxy;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;

import java.util.Objects;

public class XSubSocketDemo {
    public static void main(String[] args) throws InterruptedException {
        try (ZContext context = new ZContext()) {
            Socket socket = context.createSocket(SocketType.SUB);
            socket.connect("tcp://localhost:5555");

            socket.subscribe("status".getBytes(zmq.ZMQ.CHARSET));

            while (true) {
                Thread.sleep(1000);

                byte[] recv = socket.recv(0);
                if (Objects.isNull(recv)) {
                    continue;
                }

                System.out.println("received: " + new String(recv, ZMQ.CHARSET));
            }
        }
    }
}

监控订阅和取消订阅事件模式

package com.justin.xpubxsub;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;
import zmq.ZMQ;

public class PollerDemo {

    /**
     * 监控订阅和取消订阅事件
     */
    public static void method2() {
        try (ZContext context = new ZContext()) {
            Socket xpubSocket = context.createSocket(SocketType.XPUB);
            xpubSocket.bind("tcp://*:5555");

            Socket xsubSocket = context.createSocket(SocketType.XSUB);
            xsubSocket.bind("tcp://*:5556");

            Poller poller = context.createPoller(2);
            poller.register(xpubSocket, org.zeromq.ZMQ.Poller.POLLIN);
            poller.register(xsubSocket, org.zeromq.ZMQ.Poller.POLLIN);

            while (!Thread.currentThread().isInterrupted()) {
                poller.poll();

                if (poller.pollin(0)) {
                    byte[] message = xpubSocket.recv(0);
                    if (message[0] == 1) {
                        System.out.println("Subscription: " + new String(message, 1, message.length - 1, ZMQ.CHARSET));
                    } else if (message[0] == 0) {
                        System.out.println("Unsubscription: " + new String(message, 1, message.length - 1, ZMQ.CHARSET));
                    }
                    xsubSocket.send(message, 0);
                }

                if (poller.pollin(1)) {
                    byte[] message = xsubSocket.recv(0);
                    xpubSocket.send(message, 0);
                }
            }

            xpubSocket.close();
            xsubSocket.close();
        }
    }

    public static void main(String[] args) {
        method2();
    }
}

发布者

package com.justin.xpubxsub;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ.Socket;
import zmq.ZMQ;

public class XPubSocketDemo {
    public static void main(String[] args) throws InterruptedException {
        try (ZContext context = new ZContext()) {
            Socket socket = context.createSocket(SocketType.XPUB);
            socket.connect("tcp://localhost:5556");

            int i = 0;
            while (i <= 10) {
                String res = "status hello world";
                socket.send(res.getBytes(ZMQ.CHARSET), 0);

                i++;

                Thread.sleep(1000);
            }
        }
    }
}

订阅者

package com.justin.xpubxsub;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Objects;

public class XSubSocketDemo {
    public static void main(String[] args) throws InterruptedException, IOException {
        try (ZContext context = new ZContext()) {
            Socket socket = context.createSocket(SocketType.XSUB);
            socket.connect("tcp://localhost:5555");

            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
            outputStream.write(new byte[]{1});
            outputStream.write("status".getBytes(zmq.ZMQ.CHARSET));

            socket.send(outputStream.toByteArray());

            while (true) {
                Thread.sleep(1000);

                byte[] recv = socket.recv(0);
                if (Objects.isNull(recv)) {
                    continue;
                }

                System.out.println("received: " + new String(recv, ZMQ.CHARSET));
            }
        }
    }
}

PUSH和PULL

任务分发节点

package com.justin.pipeline;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ.Socket;
import zmq.ZMQ;

/**
 * 任务分发节点
 */
public class PushSocketDemo {
    public static void main(String[] args) throws InterruptedException {
        try (ZContext context = new ZContext()) {
            Socket socket = context.createSocket(SocketType.PUSH);
            socket.bind("tcp://*:5555");

            for (int i = 0; i < 10; i++) {
                String task = "Task # " + i;
                socket.send(task.getBytes(ZMQ.CHARSET), 0);
                System.out.println("Sent: " + task);
                Thread.sleep(1000);
            }

            socket.close();
        }
    }
}

工作节点

package com.justin.pipeline;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ.Socket;
import zmq.ZMQ;

/**
 * 工作节点
 */
public class PullSocketDemo {
    public static void main(String[] args) throws InterruptedException {
        try (ZContext context = new ZContext()) {
            Socket socket = context.createSocket(SocketType.PULL);
            socket.connect("tcp://localhost:5555");

            while (!Thread.currentThread().isInterrupted()) {
                String recvStr = socket.recvStr(0);
                System.out.println("Received: " + recvStr);
            }

            socket.close();
        }
    }
}
java
zeromq

关于作者

justin
123456
获得点赞
文章被阅读