ActiveMQ 在若依中的配置,这里使用的传输协议是stomp协议
消费时第一时间要确定对方打开了端口,是可以连接的状态。(cmd命令行测试telnet, ping)
ActiveMQ
消费消息consumeMsg:在处理业务时,需要判断JSON里面每一条数据有没有存在,当遇到不存在的JSON,需要抛出异常或者跳过本次,及时结束消费,以免造成死循环。
需要换多几条json测试,以免消费不成功
本地如何测试:
https://activemq.apache.org/download.html ;下载ActiveMQ
在\apache-activemq-5.xx.x\bin\win64 ,启动activemq.bat,不能关闭窗口
浏览器访问http://127.0.0.1:8161/ ,用户名admin ,密码admin
创建Queue ,Queue Name就是通道名称,(下面代码中,QUEUE_FLIGHT_ASSIGNPARK=“”; 需要和配合这里修改)
然后就是写完java代码之后启动项目,通过网站的对应Queue, 点击sent to;输入Message body;往通道里面放消息;
如果此时java代码没报错,并且打印出对应的log,说明就消费成功了,网站上会显示消息已经被消费了,待消费数量为0,消费者数量为1,消息出队+1;
接入测试:
cmd命令行测试telnet
对方提前往队列里面加消息
根据三方提供的host,port,user,password,queuename对应修改,连上后会消费到消息则对接成功
导入ActiveMQ需要的jar包
<!--ActiveMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.5</version>
</dependency>
#activemq
activemq:
host: 127.0.0.1
port: 61613
user: admin
password: admin
pool:
enabled: true
max-connections: 10
# #默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置
# pub-sub-domain: true
package com.ruoyi.web.controller.ActiveMQ;
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompConnection;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.UUID;
//生产者
@Component
public class JmsProducer {
@Autowired
JmsConsumer jmsConsumer;
@Value("${spring.activemq.user}")
private String userName;
@Value("${spring.activemq.password}")
private String password;
@Value("${spring.activemq.host}")
private String host;
@Value("${spring.activemq.port}")
private Integer port;
//连接
public StompConnection connectionFactory() throws Exception {
StompConnection conn = new StompConnection();
conn.open(host, port);
conn.connect(userName, password);
return conn;
* 发送JMS消息
* @throws Exception exception
public void sendMessage(String queueName, String message)
throws Exception {
StompConnection stompConnection = connectionFactory();
String tx = UUID.randomUUID().toString().replaceAll("-", "");
HashMap<String, String> headers = new HashMap<>();
headers.put(Stomp.Headers.Send.PERSISTENT, "true");
stompConnection.begin(tx);
stompConnection.send(queueName, message, tx, headers);
stompConnection.commit(tx);
stompConnection.disconnect();
QUEUE_FLIGHT_ASSIGNPARK 是需要订阅通道名
package com.ruoyi.web.controller.ActiveMQ;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.admin.domain.entity.Dto.BullyingDto;
import com.ruoyi.admin.service.WarningService;
import com.ruoyi.common.websocket.WebSocket;
import com.ruoyi.web.controller.ActiveMQ.JmsProducer;
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.net.SocketTimeoutException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import javax.annotation.Resource;
//消费者
@Component
public class JmsConsumer implements ApplicationListener<ContextRefreshedEvent> {
private static Logger LOG = LoggerFactory.getLogger(JmsConsumer.class);
@Resource
public JmsProducer jmsProducer;
@Resource
private WebSocket webSocket;
@Autowired
private WarningService warningService;
private static final String QUEUE_FLIGHT_ASSIGNPARK = "test001";
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
try {
consumeMsg(QUEUE_FLIGHT_ASSIGNPARK);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
* 从mq中消费消息
* @param queueName
* @throws Exception
public void consumeMsg(String queueName) throws Exception {
LOG.info("*********从消息队列中获取结果、并推送给前端**************");
new Thread(() -> {
while (true) {
StompFrame frame;
String messageId = "";
StompConnection stompConnection = null;
try {
String ack = "client";
stompConnection = jmsProducer.connectionFactory();
stompConnection.subscribe(queueName, ack);
stompConnection.keepAlive();
// 注意,如果没有接收到消息,
// 这个消费者线程会停在这里,直到本次等待超时
long waitTimeOut = 30000;
frame = stompConnection.receive(waitTimeOut);
Map<String, String> headers = frame.getHeaders();
messageId = headers.get("message-id");
LOG.info("消息id:{}", messageId);
//具体的业务处理......
//取出推送信息中的msg
String msg = frame.getBody(); LOG.info(msg);
// 在ack是client标记的情况下,确认消息
if ("client".equals(ack)) {
stompConnection.ack(messageId);
LOG.info("确认消息");
} catch (SocketTimeoutException e) {
LOG.error(e.getMessage());
continue;
} catch (JSONException ex) {
LOG.error("message_id:{},数据异常:{}", messageId, ex.getMessage());
continue;
} catch (Exception e) {
LOG.error(e.getMessage());
continue;
} finally {
try {
stompConnection.ack(messageId);
// LOG.info();
stompConnection.disconnect();
} catch (Exception e) {
LOG.error(e.getMessage());
}).start();