添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

在本文中,我们将创建一个简单的 Spring 应用,用于连接到 ActiveMQ 并发送和接收消息。我们将重点关注测试这个应用以及测试 Spring JMS 的不同方法。

Spring JMS 是 Spring 框架提供的一个模块,用于支持与 Java 消息服务(Java Message Service,JMS)提供者的集成。JMS是一种用于在分布式系统中发送、接收和传递消息的标准 API。

2、应用设置

首先,创建一个用于测试的基本 Spring 应用。添加必要的依赖,并实现消息处理。

2.1、依赖

pom.xml 中添加所需的依赖。我们需要 Spring JMS 来监听 JMS 消息。我们将在部分测试中使用 ActiveMQ-Junit 启动嵌入式 ActiveMQ 实例,并在其他测试中使用 TestContainers 运行 ActiveMQ Docker 容器:

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>4.3.4.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq.tooling</groupId>
    <artifactId>activemq-junit</artifactId>
    <version>5.16.5</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>testcontainers</artifactId>
    <version>1.17.3</version>
    <scope>test</scope>
</dependency>

2.2、应用代码

现在,创建一个可以监听消息的 Spring 应用:

@ComponentScan
public class JmsApplication {
    public static void main(String[] args) {
        ApplicationContext context = new AnnotationConfigApplicationContext(JmsApplication.class);

我们需要创建一个配置类,使用 @EnableJms 注解启用 JMS,并配置 ConnectionFactory 以连接到 ActiveMQ 实例:

@Configuration
@EnableJms
public class JmsConfig {
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        return factory;
    @Bean
    public ConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory("tcp://localhost:61616");
    @Bean
    public JmsTemplate jmsTemplate() {
        return new JmsTemplate(connectionFactory());

接着,创建可以接收和处理消息的监听器(Listener):

@Component
public class MessageListener {
    private static final Logger logger = LoggerFactory.getLogger(MessageListener.class);
    @JmsListener(destination = "queue-1")
    public void sampleJmsListenerMethod(TextMessage message) throws JMSException {
        logger.info("JMS listener received text message: {}", message.getText());

还需要一个可以发送信息的类:

@Component
public class MessageSender {
    @Autowired
    private JmsTemplate jmsTemplate;
    private static final Logger logger = LoggerFactory.getLogger(MessageSender.class);
    public void sendTextMessage(String destination, String message) {
        logger.info("Sending message to {} destination with text {}", destination, message);
        jmsTemplate.send(destination, s -> s.createTextMessage(message));

3、使用嵌入式 ActiveMQ 进行测试

首先使用嵌入式 ActiveMQ 实例来测试一下我们的应用

创建测试类并添加一个管理 ActiveMQ 实例的 JUnit Rule:

@RunWith(SpringRunner.class)
public class EmbeddedActiveMqTests4 {
    @ClassRule
    public static EmbeddedActiveMQBroker embeddedBroker = new EmbeddedActiveMQBroker();
    @Test
    public void test() {
    // ...

运行这个空测试并查看日志。可以看到,测试启动了嵌入式 broker:

INFO | Starting embedded ActiveMQ broker: embedded-broker
INFO | Using Persistence Adapter: MemoryPersistenceAdapter
INFO | Apache ActiveMQ 5.14.1 (embedded-broker, ID:DESKTOP-52539-254421135-0:1) is starting
INFO | Apache ActiveMQ 5.14.1 (embedded-broker, ID:DESKTOP-52539-254421135-0:1) started
INFO | For help or more information please see: http://activemq.apache.org
INFO | Connector vm://embedded-broker started
INFO | Successfully connected to vm://embedded-broker?create=false

测试类中的所有测试执行完后,broker 就会停止。

需要配置应用以连接到 ActiveMQ 实例,这样才能正确测试 MessageListener MessageSender 类:

@Configuration
@EnableJms
static class TestConfiguration {
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        return factory;
    @Bean
    public ConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory(embeddedBroker.getVmURL());
    @Bean
    public JmsTemplate jmsTemplate() {
        return new JmsTemplate(connectionFactory());

该类使用一个特殊的 ConnectionFactory ,可从嵌入式 broker 获取 URL。现在,需要在包含测试的类上添加 @ContextConfiguration 注解,从而使用该配置:

@ContextConfiguration(classes = { TestConfiguration.class, MessageSender.class }) public class EmbeddedActiveMqTests {

3.1、发送消息

编写第一个测试,检查 MessageSender 类的功能。首先,需要获取该类实例的引用,只需将其作为字段注入即可:

@Autowired
private MessageSender messageSender;

向 ActiveMQ 发送一条简单的消息,并添加一些断言来检查功能:

@Test
public void whenSendingMessage_thenCorrectQueueAndMessageText() throws JMSException {
    String queueName = "queue-2";
    String messageText = "Test message";
    messageSender.sendTextMessage(queueName, messageText);
    assertEquals(1, embeddedBroker.getMessageCount(queueName));
    TextMessage sentMessage = embeddedBroker.peekTextMessage(queueName);
    assertEquals(messageText, sentMessage.getText());

消息发送后,队列中确实包含了一个具有正确文本的条目。现在可以确定 MessageSender 运行正常,

3.2、接收消息

接着,测试 Listener 类。先创建一个新的测试方法,然后用嵌入式 broker 发送一条消息。

Listener 的目标(destination)设置为了 “queue-1” ,所以需要确保在这里使用相同的名称。

使用 Mockito 来测试 Listener。通过 @SpyBean 注解来获取 MessageListener 实例:

@SpyBean
private MessageListener messageListener;

然后,检查方法是否被调用,并使用 ArgumentCaptor 捕捉接收到的方法参数:

@Test
public void whenListening_thenReceivingCorrectMessage() throws JMSException {
    String queueName = "queue-1";
    String messageText = "Test message";
    assertEquals(0, embeddedBroker.getDestination(queueName).getDestinationStatistics().getDispatched().getCount());
    assertEquals(0, embeddedBroker.getDestination(queueName).getDestinationStatistics().getMessages().getCount());
    embeddedBroker.pushMessage(queueName, messageText);
    ArgumentCaptor messageCaptor = ArgumentCaptor.forClass(TextMessage.class);
    Mockito.verify(messageListener, Mockito.timeout(100))
        .sampleJmsListenerMethod(messageCaptor.capture());
    TextMessage receivedMessage = messageCaptor.getValue();
    assertEquals(messageText, receivedMessage.getText());
    assertEquals(1, embeddedBroker.getDestination(queueName).getDestinationStatistics().getDispatched().getCount());
    assertEquals(0, embeddedBroker.getDestination(queueName).getDestinationStatistics().getMessages().getCount());

现在,现在运行测试。上述 2 个测试都将会正常的通过。

4、使用 TestContainers 进行测试

让我们看看在 Spring 应用中测试 JMS 的另一种方法。

可以使用 TestContainers 运行一个 ActiveMQ Docker 容器,并在测试中连接到它。

创建一个新的测试类,并将 Docker 容器作为 JUnit Rule 包含进来。

@RunWith(SpringRunner.class)
public class TestContainersActiveMqTests {
    @ClassRule
    public static GenericContainer<?> activeMqContainer 
      = new GenericContainer<>(DockerImageName.parse("rmohr/activemq:5.14.3")).withExposedPorts(61616);
    @Test
    public void test() throws JMSException {

运行这个测试并查看日志。可以看到一些与 TestContainers 有关的信息,它正在拉取指定的 docker 镜像并启动容器:

INFO | Creating container for image: rmohr/activemq:5.14.3
INFO | Container rmohr/activemq:5.14.3 is starting: e9b0ddcd45c54fc9994aff99d734d84b5fae14b55fdc70887c4a2c2309b229a7
INFO | Container rmohr/activemq:5.14.3 started in PT2.635S

创建一个与 ActiveMQ 类似的配置类。唯一不同的是 ConnectionFactory 的配置:

@Bean
public ConnectionFactory connectionFactory() {
    String brokerUrlFormat = "tcp://%s:%d";
    String brokerUrl = String.format(brokerUrlFormat, activeMqContainer.getHost(), activeMqContainer.getFirstMappedPort());
    return new ActiveMQConnectionFactory(brokerUrl);

4.1、发送消息

测试 MessageSender 类,看看它是否能在这个 Docker 容器上运行。这次不能使用 EmbeddedBroker 上的方法,而是使用 Spring JmsTemplate ,它也很简单:

@Autowired
private MessageSender messageSender;
@Autowired
private JmsTemplate jmsTemplate;
@Test
public void whenSendingMessage_thenCorrectQueueAndMessageText() throws JMSException {
    String queueName = "queue-2";
    String messageText = "Test message";
    messageSender.sendTextMessage(queueName, messageText);
    Message sentMessage = jmsTemplate.receive(queueName);
    Assertions.assertThat(sentMessage).isInstanceOf(TextMessage.class);
    assertEquals(messageText, ((TextMessage) sentMessage).getText());

使用 JmsTemplate 来读取队列的内容,检查 messageSender 是否发送了正确的消息。

4.2、接收消息

测试 Listener 类也没什么不同。使用 JmsTemplate 发送一条信息,并验证 Listener 是否收到了正确的文本:

@SpyBean
private MessageListener messageListener;
@Test
public void whenListening_thenReceivingCorrectMessage() throws JMSException {
    String queueName = "queue-1";
    String messageText = "Test message";
    jmsTemplate.send(queueName, s -> s.createTextMessage(messageText));
    ArgumentCaptor<TextMessage> messageCaptor = ArgumentCaptor.forClass(TextMessage.class);
    Mockito.verify(messageListener, Mockito.timeout(100)).sampleJmsListenerMethod(messageCaptor.capture());
    TextMessage receivedMessage = messageCaptor.getValue();
    assertEquals(messageText, receivedMessage.getText());

本文介绍了使用嵌入式 ActiveMQ 和 TestContainers 来测试 Spring JMS 的 2 种方式。推荐使用 TestContainers,因为用 docker 容器,可以更好地模拟真实的场景。

参考:https://www.baeldung.com/spring-jms-testing

  • Kafka 中的 InstanceAlreadyExistsException 异常
  • 使用 MongoDB 和 Spring AI 构建 RAG 应用
  • Spring Boot v3.3.4 发布
  • 使用 OpenFeign 和 CompletableFuture 并行处理多个 HTTP 请求
  • 使用 Stream API 处理 JDBC ResultSet
  •