添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
沉着的烈马  ·  折腾前必看--Remarkable ...·  4 月前    · 
眼睛小的酸菜鱼  ·  502 Bad Gateway·  4 月前    · 
高大的包子  ·  OpenCV(cv::medianBlur( ...·  5 月前    · 
老实的玉米  ·  Ebook Translator:用 ...·  6 月前    · 
近视的桔子  ·  阿里云CDN ...·  6 月前    · 
自动

RabbitMQ 学习笔记

2022-08-16
2分钟阅读时长

RabbitMQ tutorial in Spring AMQP

HelloWorld

@SpringBootApplication
public class LearnRabbitmqHelloWorldApplication {
    public static void main(String[] args) {
        SpringApplication.run(LearnRabbitmqHelloWorldApplication.class, args).close();
    @Bean
    public ApplicationRunner runner(AmqpTemplate template) {
        return args -> template.convertAndSend("myqueue", "foo");
    @Bean
    public Queue myQueue() {
        return new Queue("myqueue");
    @RabbitListener(queues = "myqueue")
    public void listen(String in) {
        System.out.println(in);

代码为使用 Spring AMQP 简单实现一个生产者和消费者的例子

值得注意的是,如果 RabbitListener 或者 convertAndSend 指定的 queue 不存在的话,启动时会报错,可以通过声明一个 Queue 类型的 bean 让 AMQP 扫描到之后自动在 RabbitMQ 中创建

Work Queues

Work Queues 的目的是创建一个用于在多个消费者之间分配耗时任务的队列,

@SpringBootApplication
public class LearnrabbitmqApplication {
    public static void main(String[] args) {
        SpringApplication.run(LearnrabbitmqApplication.class, args);
    AtomicInteger dots = new AtomicInteger(0);
    AtomicInteger count = new AtomicInteger(0);
    @Bean
    public Queue myQueue() {
        return new Queue("myqueue");
    @Bean
    public ApplicationRunner runner(AmqpTemplate template, Queue queue) {
        return args -> {
            for (int i = 0; i < 5; i++) {
                StringBuilder builder = new StringBuilder("Hello");
                if (dots.incrementAndGet() == 4) {
                    dots.set(1);
                builder.append(".".repeat(Math.max(0, dots.get())));
                builder.append(count.incrementAndGet());
                String message = builder.toString();
                template.convertAndSend(queue.getName(), message);
                System.out.println(" [x] Sent '" + message + "'");
    @Bean
    public LongTaskReceiver rec1() {
        return new LongTaskReceiver(1);
    @Bean
    public LongTaskReceiver rec2() {
        return new LongTaskReceiver(2);
@RabbitListener(queues = "myqueue")
class LongTaskReceiver {
    private final Integer flag;
    LongTaskReceiver(Integer flag) {
        this.flag = flag;
    @RabbitHandler
    public void receive(String in) throws InterruptedException {
        StopWatch watch = new StopWatch();
        watch.start();
        System.out.println("instance " + this.flag +
                " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        System.out.println("instance " + this.flag +
                " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    private void doWork(String in) throws InterruptedException {
        for (char ch : in.toCharArray()) {
            if (ch == '.') {
                Thread.sleep(1000);