RabbitMQ 学习笔记
2022-08-16
2分钟阅读时长
RabbitMQ tutorial in Spring AMQP
HelloWorld
@SpringBootApplication
public class LearnRabbitmqHelloWorldApplication {
public static void main(String[] args) {
SpringApplication.run(LearnRabbitmqHelloWorldApplication.class, args).close();
@Bean
public ApplicationRunner runner(AmqpTemplate template) {
return args -> template.convertAndSend("myqueue", "foo");
@Bean
public Queue myQueue() {
return new Queue("myqueue");
@RabbitListener(queues = "myqueue")
public void listen(String in) {
System.out.println(in);
代码为使用 Spring AMQP 简单实现一个生产者和消费者的例子
值得注意的是,如果 RabbitListener 或者 convertAndSend 指定的 queue 不存在的话,启动时会报错,可以通过声明一个
Queue
类型的 bean 让 AMQP 扫描到之后自动在 RabbitMQ 中创建
Work Queues
Work Queues 的目的是创建一个用于在多个消费者之间分配耗时任务的队列,
@SpringBootApplication
public class LearnrabbitmqApplication {
public static void main(String[] args) {
SpringApplication.run(LearnrabbitmqApplication.class, args);
AtomicInteger dots = new AtomicInteger(0);
AtomicInteger count = new AtomicInteger(0);
@Bean
public Queue myQueue() {
return new Queue("myqueue");
@Bean
public ApplicationRunner runner(AmqpTemplate template, Queue queue) {
return args -> {
for (int i = 0; i < 5; i++) {
StringBuilder builder = new StringBuilder("Hello");
if (dots.incrementAndGet() == 4) {
dots.set(1);
builder.append(".".repeat(Math.max(0, dots.get())));
builder.append(count.incrementAndGet());
String message = builder.toString();
template.convertAndSend(queue.getName(), message);
System.out.println(" [x] Sent '" + message + "'");
@Bean
public LongTaskReceiver rec1() {
return new LongTaskReceiver(1);
@Bean
public LongTaskReceiver rec2() {
return new LongTaskReceiver(2);
@RabbitListener(queues = "myqueue")
class LongTaskReceiver {
private final Integer flag;
LongTaskReceiver(Integer flag) {
this.flag = flag;
@RabbitHandler
public void receive(String in) throws InterruptedException {
StopWatch watch = new StopWatch();
watch.start();
System.out.println("instance " + this.flag +
" [x] Received '" + in + "'");
doWork(in);
watch.stop();
System.out.println("instance " + this.flag +
" [x] Done in " + watch.getTotalTimeSeconds() + "s");
private void doWork(String in) throws InterruptedException {
for (char ch : in.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);