添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
博客放养了一年多,今天迁移到腾讯云了,不得不说腾讯的无忧计划是真的香,以后再也不搬家了。迁移过来后,主题出了不少问题,索性从v6.0升级到最新版v8.2.1了,开始还担心又要重新配置主题了,一堆东西挺闹心的。后面发现无需修改任何配置,上传新版后直接无缝升级,以前7.0升级的时候会出现各种问题,干脆降回去了。看来后期版本,作者开始重视版本升级的用户体验了,所有配置自动兼容,很赞!
September 29th, 2021 at 09:03 pm
最近这雨是包月了啊,还是连续包月那种
July 5th, 2020 at 08:06 pm
端午节后的第一周比平常多了一天,不巧的是这一周的我还特别的闲,更加让我感觉这一周特别长。上班又不能老划水以浪费时间,所以利用这段时间进行自我提升。只是最近发生的事情比较多,又是刚收假,所以进入深度学习状态的速度比较慢,效率不高。学习还是一件比较吃自制力的事情,加油吧。
July 2nd, 2020 at 09:56 am

夜月归途

RocketMQ 分布式消息中间件-Java端入门代码示例
开发准备依赖新建Maven工程,引入依赖Maven地址:https://mvnrepository.com/art...
扫描右侧二维码阅读全文
20
2019/04

RocketMQ 分布式消息中间件-Java端入门代码示例

  • 博主:
  • 发布时间:
  • 6630次浏览
  • 10161字数
  • 分类: 技能学习
  • 开发准备

    依赖

    新建Maven工程,引入依赖

      <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
      <dependency>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-client</artifactId>
          <version>4.5.0</version>
      </dependency>  
    • 这里我还引入了一个rocketmq-example,这里面的示例代码和官方文档是一直的,可以直接参考
      <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-example -->
      <dependency>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-example</artifactId>
          <version>4.5.0</version>
      </dependency>
    • 前一步的RocketMQ服务搭建并运行无误就可以进入Java编码了

    命令

    • 这里重点说一下,防火墙请放行9876端口,如果可以请把防火墙彻底关掉。
    • 贴上CentOS7中 firewalld 的基本使用命令:

      • 启动: systemctl start firewalld
      • 关闭: systemctl stop firewalld
      • 查看状态: systemctl status firewalld
      • 开机禁用 : systemctl disable firewalld
      • 开机启用 : systemctl enable firewalld
      • 放行9876端口: firewall-cmd --zone=public --add-port=9876/tcp --permanent (--permanent永久生效,没有此参数重启后失效)
      • 重新载入: firewall-cmd --reload (修改端口后需要重新载入)
      • 查看9876端口: firewall-cmd --zone= public --query-port=9876/tcp
      • 删除9876端口: firewall-cmd --zone= public --remove-port=9876/tcp --permanent

    文档

    • 本篇代码参照自官方文档及 rocketmq-example 中的示例代码
    • 官方文档及快速上手示例地址: https://rocketmq.apache.org/docs/simple-example/
    • Producer示例代码路径: org.apache.rocketmq.example.quickstart.Producer (需导入 rocketmq-example 依赖)
    • Customer示例代码路径: org.apache.rocketmq.example.quickstart.Customer (需导入 rocketmq-example 依赖)

    <hr/>

    编码运行

    生产者

    • 代码参考,每个步骤写有注释
        package com.rocketmq.demo;
        import org.apache.rocketmq.client.exception.MQClientException;
        import org.apache.rocketmq.client.producer.DefaultMQProducer;
        import org.apache.rocketmq.client.producer.SendResult;
        import org.apache.rocketmq.common.message.Message;
        import org.apache.rocketmq.remoting.common.RemotingHelper;
         * 生产者测试
         * @author zhangkuan
         * @date 2019/4/8
        public class ProducerTest {
            public static void main(String[] args) throws MQClientException, InterruptedException {
                 * 声明并初始化一个 Producer,同时指定 Producer Group 的名称
                 * 一个应用创建一个 Producer,由应用来维护此对象,可以设置为全局对象或者单例
                 * ProducerGroup 的名称 "producerGroupName" 需要由应用来保证唯一性
                 * ProducerGroup 这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
                 * 因为服务器会回查这个 Group 下的任意一个 Producer
                DefaultMQProducer producer = new DefaultMQProducer("myProducerGroupName_1");
                 * 指定 Producer 连接的 nameServer 服务器所在地址以及端口
                 * 如果是分布式部署的多个,则用分号隔开,如:
                 * setNamesrvAddr("172.16.235.77:9876;172.16.235.78:9876");
                 * 这里只是为了方便才将地址与端口写死,实际中应该至少放在配置文件中去
                producer.setNamesrvAddr("47.75.222.71:9876");
                 * 指定自己的在 Producer Group 中的名称
                producer.setInstanceName("myProducer_1");
                 * Producer 对象在使用之前必须要调用 start 进行启动初始化
                 * 初始化一次即可,切忌不可每次发送消息时,都调用start方法
                producer.start();
                 * 一个 Producer 对象可以发送多个 topic(主题),多个 tag 的消息
                 * 本实例 send 方法采用同步调用,只要不抛异常就标识成功
                boolean flag1 = true;
                boolean flag2 = true;
                for (int i = 0; i < 8; i++) {
                    try {
                        Message msg;
                        if (i % 2 == 1) {
                             * 创建一个消息实例,指定topic、tag和消息主体。
                            msg = new Message("myTopicTest_1", flag1 ? "TagA" : "TagB", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                            flag1 = !flag1;
                        } else {
                             * 创建一个消息实例,指定topic、tag和消息主体。
                            msg = new Message("myTopicTest_2", flag2 ? "TagC" : "TagD", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                            flag2 = !flag2;
                         * 调用send message将消息传递给其中的一个节点
                        SendResult sendResult = producer.send(msg);
                        System.out.printf("%s%n", sendResult);
                    } catch (Exception e) {
                        e.printStackTrace();
                        Thread.sleep(500);
                 * 应用退出时,调用 shutdown 关闭网络连接,清理资源,从 MocketMQ 服务器上注销自己
                 * 建议应用在 JBOSS、Tomcat 等容器的退出钩子里调用 shutdown 方法
                producer.shutdown();
        }

    消费者

    • 消费者代码,同样在注释中说明
        package com.rocketmq.demo;
        import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
        import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
        import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
        import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
        import org.apache.rocketmq.client.exception.MQClientException;
        import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
        import org.apache.rocketmq.common.message.MessageExt;
        import java.util.List;
         * 消费者测试
         * @author zhangkuan
         * @date 2019/4/8
        public class CustomerTest {
             * 当前例子是 PushConsumer 用法,给用户感觉是消息从 RocketMQ 服务器推到了应用客户端。
             * 而实际 PushConsumer 内部是使用长轮询 Pull(拉取) 方式从 MetaQ 服务器拉消息,然后再回调用户 Listener方法
             * @param args
             * @throws InterruptedException
             * @throws MQClientException
            public static void main(String[] args) throws MQClientException {
                 * 声明并初始化 一个 consumer
                 * Consumer Group 组名,多个 Consumer 如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组
                 * 一个应用创建一个 Consumer,由应用来维护此对象,可以设置为全局对象或者单例
                 * ConsumerGroupName 需要由应用来保证唯一
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myCustomerGroupName_1");
                 * 指定 NameServer 的地址 与端口
                 * 指定自己在 Consumer Group 组中的名称
                consumer.setNamesrvAddr("47.75.222.71:9876");
                consumer.setInstanceName("myConsumer_1");
                 * 设置 consumer 的消费策略
                 * CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
                 * CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
                 * CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和 setConsumeTimestamp() 配合使用,默认是半个小时以前
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                 * 消费者订阅消息,如下所示订阅 topic 为 myTopicTest_1 下tag为 TagA 类型的消息
                 * 订阅指定 topic 下 tags 分别等于TagC或TagD,则为:<code>consumer.subscribe("myTopicTest_2", "TagC||TagD");</code>
                 * 一个 consumer 对象可以订阅多个 topic,如下
                consumer.subscribe("myTopicTest_1", "TagA");
                consumer.subscribe("myTopicTest_2", "TagC||TagD");
                 * 注册消息监听器,如果有订阅的消息就会响应
                consumer.registerMessageListener(new MessageListenerConcurrently() {
                     * 默认 msgs 里只有一条消息,可以通过设置 consumeMessageBatchMaxSize 参数来批量接收消息
                     * consumeThreadMin:消费线程池数量 默认最小值10
                     * consumeThreadMax:消费线程池数量 默认最大值20
                    @Override
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                        System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size());
                        MessageExt msg = msgs.get(0);
                        System.out.printf("%s Receive New Messages: %s %n", new Object[]{Thread.currentThread().getName(), msgs});
                        System.out.println("----topic: " + msg.getTopic() + ", tag: " + msg.getTags() + ", body: " + new String(msg.getBody()));
                         * 返回消费状态
                         * CONSUME_SUCCESS 消费成功
                         * RECONSUME_LATER 消费失败,需要稍后重新消费
                        //return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                 * Consumer对象在使用之前必须要调用start初始化,初始化一次即可
                consumer.start();
                System.out.printf("Consumer Started.%n");
    

    <hr/>

    采坑记录

    RocketMQ服务外网不能访问

    • 如果你按照上一篇的方式运行,那么你可能会遇到下面这种错误(在本地运行RocketMQ不会遇到这个问题)
      org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
      at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:634)
      at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1279)
      at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1225)
      at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:283)
      at com.rocketmq.demo.ProducerTest.main(ProducerTest.java:69)
      09:19:42.792 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[] result: true
    • 那么你需要修改一些配置,在编译后的目录中的 conf 文件夹下有一个 broker.conf 配置文件,在其末尾添加如下配置:

      • vim /root/rocketmq-4.5.0/conf/broker.conf
        brokerClusterName = DefaultCluster
        brokerName = broker-a
        brokerId = 0
        deleteWhen = 04
        fileReservedTime = 48
        brokerRole = ASYNC_MASTER
        flushDiskType = ASYNC_FLUSH
        # 添加如下两行内容,'47.**.***.71'为你的服务器公网IP
        namesrvAddr=47.**.***.71:9876
        # 需要指定broker外网IP否则外网不能访问
        brokerIP1=47.**.***.71
    • 然后将broker的运行方式改为: nohup sh bin/mqbroker -c conf/broker.conf &

    RocketMQ 分布式消息中间件-Java端入门代码示例

    开发准备

    依赖

    新建Maven工程,引入依赖

      <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
      <dependency>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-client</artifactId>
          <version>4.5.0</version>
      </dependency>  
    • 这里我还引入了一个rocketmq-example,这里面的示例代码和官方文档是一直的,可以直接参考
      <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-example -->
      <dependency>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-example</artifactId>
          <version>4.5.0</version>
      </dependency>
    • 前一步的RocketMQ服务搭建并运行无误就可以进入Java编码了

    命令

    • 这里重点说一下,防火墙请放行9876端口,如果可以请把防火墙彻底关掉。
    • 贴上CentOS7中 firewalld 的基本使用命令:

      • 启动: systemctl start firewalld
      • 关闭: systemctl stop firewalld
      • 查看状态: systemctl status firewalld
      • 开机禁用 : systemctl disable firewalld
      • 开机启用 : systemctl enable firewalld
      • 放行9876端口: firewall-cmd --zone=public --add-port=9876/tcp --permanent (--permanent永久生效,没有此参数重启后失效)
      • 重新载入: firewall-cmd --reload (修改端口后需要重新载入)
      • 查看9876端口: firewall-cmd --zone= public --query-port=9876/tcp
      • 删除9876端口: firewall-cmd --zone= public --remove-port=9876/tcp --permanent

    文档

    • 本篇代码参照自官方文档及 rocketmq-example 中的示例代码
    • 官方文档及快速上手示例地址: https://rocketmq.apache.org/docs/simple-example/
    • Producer示例代码路径: org.apache.rocketmq.example.quickstart.Producer (需导入 rocketmq-example 依赖)
    • Customer示例代码路径: org.apache.rocketmq.example.quickstart.Customer (需导入 rocketmq-example 依赖)

    <hr/>

    编码运行

    生产者

    • 代码参考,每个步骤写有注释
        package com.rocketmq.demo;
        import org.apache.rocketmq.client.exception.MQClientException;
        import org.apache.rocketmq.client.producer.DefaultMQProducer;
        import org.apache.rocketmq.client.producer.SendResult;
        import org.apache.rocketmq.common.message.Message;
        import org.apache.rocketmq.remoting.common.RemotingHelper;
         * 生产者测试
         * @author zhangkuan
         * @date 2019/4/8
        public class ProducerTest {
            public static void main(String[] args) throws MQClientException, InterruptedException {
                 * 声明并初始化一个 Producer,同时指定 Producer Group 的名称
                 * 一个应用创建一个 Producer,由应用来维护此对象,可以设置为全局对象或者单例
                 * ProducerGroup 的名称 "producerGroupName" 需要由应用来保证唯一性
                 * ProducerGroup 这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
                 * 因为服务器会回查这个 Group 下的任意一个 Producer
                DefaultMQProducer producer = new DefaultMQProducer("myProducerGroupName_1");
                 * 指定 Producer 连接的 nameServer 服务器所在地址以及端口
                 * 如果是分布式部署的多个,则用分号隔开,如:
                 * setNamesrvAddr("172.16.235.77:9876;172.16.235.78:9876");
                 * 这里只是为了方便才将地址与端口写死,实际中应该至少放在配置文件中去
                producer.setNamesrvAddr("47.75.222.71:9876");
                 * 指定自己的在 Producer Group 中的名称
                producer.setInstanceName("myProducer_1");
                 * Producer 对象在使用之前必须要调用 start 进行启动初始化
                 * 初始化一次即可,切忌不可每次发送消息时,都调用start方法
                producer.start();
                 * 一个 Producer 对象可以发送多个 topic(主题),多个 tag 的消息
                 * 本实例 send 方法采用同步调用,只要不抛异常就标识成功
                boolean flag1 = true;
                boolean flag2 = true;
                for (int i = 0; i < 8; i++) {
                    try {
                        Message msg;
                        if (i % 2 == 1) {
                             * 创建一个消息实例,指定topic、tag和消息主体。
                            msg = new Message("myTopicTest_1", flag1 ? "TagA" : "TagB", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                            flag1 = !flag1;
                        } else {
                             * 创建一个消息实例,指定topic、tag和消息主体。
                            msg = new Message("myTopicTest_2", flag2 ? "TagC" : "TagD", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                            flag2 = !flag2;
                         * 调用send message将消息传递给其中的一个节点
                        SendResult sendResult = producer.send(msg);
                        System.out.printf("%s%n", sendResult);
                    } catch (Exception e) {
                        e.printStackTrace();
                        Thread.sleep(500);
                 * 应用退出时,调用 shutdown 关闭网络连接,清理资源,从 MocketMQ 服务器上注销自己
                 * 建议应用在 JBOSS、Tomcat 等容器的退出钩子里调用 shutdown 方法
                producer.shutdown();
        }

    消费者

    • 消费者代码,同样在注释中说明
        package com.rocketmq.demo;
        import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
        import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
        import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
        import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
        import org.apache.rocketmq.client.exception.MQClientException;
        import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
        import org.apache.rocketmq.common.message.MessageExt;
        import java.util.List;
         * 消费者测试
         * @author zhangkuan
         * @date 2019/4/8
        public class CustomerTest {
             * 当前例子是 PushConsumer 用法,给用户感觉是消息从 RocketMQ 服务器推到了应用客户端。
             * 而实际 PushConsumer 内部是使用长轮询 Pull(拉取) 方式从 MetaQ 服务器拉消息,然后再回调用户 Listener方法
             * @param args
             * @throws InterruptedException
             * @throws MQClientException
            public static void main(String[] args) throws MQClientException {
                 * 声明并初始化 一个 consumer
                 * Consumer Group 组名,多个 Consumer 如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组
                 * 一个应用创建一个 Consumer,由应用来维护此对象,可以设置为全局对象或者单例
                 * ConsumerGroupName 需要由应用来保证唯一
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myCustomerGroupName_1");
                 * 指定 NameServer 的地址 与端口
                 * 指定自己在 Consumer Group 组中的名称
                consumer.setNamesrvAddr("47.75.222.71:9876");
                consumer.setInstanceName("myConsumer_1");
                 * 设置 consumer 的消费策略
                 * CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
                 * CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
                 * CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和 setConsumeTimestamp() 配合使用,默认是半个小时以前
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                 * 消费者订阅消息,如下所示订阅 topic 为 myTopicTest_1 下tag为 TagA 类型的消息
                 * 订阅指定 topic 下 tags 分别等于TagC或TagD,则为:<code>consumer.subscribe("myTopicTest_2", "TagC||TagD");</code>
                 * 一个 consumer 对象可以订阅多个 topic,如下
                consumer.subscribe("myTopicTest_1", "TagA");
                consumer.subscribe("myTopicTest_2", "TagC||TagD");
                 * 注册消息监听器,如果有订阅的消息就会响应
                consumer.registerMessageListener(new MessageListenerConcurrently() {
                     * 默认 msgs 里只有一条消息,可以通过设置 consumeMessageBatchMaxSize 参数来批量接收消息
                     * consumeThreadMin:消费线程池数量 默认最小值10
                     * consumeThreadMax:消费线程池数量 默认最大值20
                    @Override
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                        System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size());
                        MessageExt msg = msgs.get(0);
                        System.out.printf("%s Receive New Messages: %s %n", new Object[]{Thread.currentThread().getName(), msgs});
                        System.out.println("----topic: " + msg.getTopic() + ", tag: " + msg.getTags() + ", body: " + new String(msg.getBody()));
                         * 返回消费状态
                         * CONSUME_SUCCESS 消费成功
                         * RECONSUME_LATER 消费失败,需要稍后重新消费
                        //return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                 * Consumer对象在使用之前必须要调用start初始化,初始化一次即可
                consumer.start();
                System.out.printf("Consumer Started.%n");
    

    <hr/>

    采坑记录

    RocketMQ服务外网不能访问

    • 如果你按照上一篇的方式运行,那么你可能会遇到下面这种错误(在本地运行RocketMQ不会遇到这个问题)
      org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
      at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:634)
      at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1279)
      at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1225)
      at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:283)
      at com.rocketmq.demo.ProducerTest.main(ProducerTest.java:69)
      09:19:42.792 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[] result: true
    • 那么你需要修改一些配置,在编译后的目录中的 conf 文件夹下有一个 broker.conf 配置文件,在其末尾添加如下配置:

      • vim /root/rocketmq-4.5.0/conf/broker.conf
        brokerClusterName = DefaultCluster
        brokerName = broker-a
        brokerId = 0
        deleteWhen = 04
        fileReservedTime = 48
        brokerRole = ASYNC_MASTER
        flushDiskType = ASYNC_FLUSH
        # 添加如下两行内容,'47.**.***.71'为你的服务器公网IP
        namesrvAddr=47.**.***.71:9876
        # 需要指定broker外网IP否则外网不能访问
        brokerIP1=47.**.***.71
    • 然后将broker的运行方式改为: nohup sh bin/mqbroker -c conf/broker.conf &