RabbitMQ-Java-08-備份交換機

csdn_yasin 2022-01-07 11:12:07 阅读数:15

rabbitmq-java-08- rabbitmq java

說明

  • RabbitMQ-Java-08-備份交換機
  • 本案例是一個Maven+SpringBoot項目
  • 假設你已經實現了上一節發布確認高級
  • 官方文檔已包含絕大多數本案例內容。請移步:https://docs.spring.io/spring-amqp/docs/current/reference/html/

核心概念

》備份交換機說明

  • 備份交換機是為了解决什麼問題呢?
    • 跟發布確認高級差不多,也是為了解决防止消息丟失問題
    • 無法投遞的消息將轉發給備份交換機
  • 主要操作有哪些呢?
    • 新建配置類
      • 新建一個普通交換機(direct),通過附加參數聲明備份交換機
        return ExchangeBuilder.directExchange(EXCHANGE_NORMAL).withArgument(
        "alternate-exchange", EXCHANGE_BACKUP // 指定備份交換機
        ).build();
        
      • 新建一個普通隊列
      • 綁定普通隊列、交換機、路由key
      • 新建一個備份交換機(fanout)
      • 新建一個備份隊列
      • 綁定備份隊列、交換機
    • 新建消費者組件
      • 正常隊列消費者
      • 備份隊列消費者
    • 控制器
      • 生產者

操作步驟

》完整代碼

  • application.properties
    spring.rabbitmq.host=192.168.3.202
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123456
    spring.rabbitmq.publisher-confirm-type=correlated
    spring.rabbitmq.publisher-returns=true
    
  • MyBackupConfig
    package cn.cnyasin.rabbit.config;
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    @Configuration
    public class MyBackupConfig {
    // 交換機
    public static final String EXCHANGE_NORMAL = "exchange_normal";
    public static final String EXCHANGE_BACKUP = "exchange_backup";
    // 隊列
    public static final String QUEUE_NORMAL = "queue_normal";
    public static final String QUEUE_BACKUP = "queue_backup";
    // 路由key
    public static final String ROUTING_NORMAL = "routing_normal";
    // 聲明交換機
    @Bean
    public DirectExchange exchangeNormal() {
    return ExchangeBuilder.directExchange(EXCHANGE_NORMAL).withArgument(
    "alternate-exchange", EXCHANGE_BACKUP // 指定備份交換機
    ).build();
    }
    @Bean
    public FanoutExchange exchangeBackup() {
    return new FanoutExchange(EXCHANGE_BACKUP);
    }
    // 聲明隊列
    @Bean
    public Queue queueNormal() {
    return QueueBuilder.durable(QUEUE_NORMAL).build();
    }
    @Bean
    public Queue queueBackup() {
    return QueueBuilder.durable(QUEUE_BACKUP).build();
    }
    // 綁定隊列、交換機、路由key
    @Bean
    public Binding queueNormalBindExchangeNormal(
    @Qualifier("queueNormal") Queue queue,
    @Qualifier("exchangeNormal") Exchange exchange
    ) {
    return BindingBuilder.bind(queue).to(exchange).with(ROUTING_NORMAL).noargs();
    }
    @Bean
    public Binding queueBackupBindExchangeBackup(
    @Qualifier("queueBackup") Queue queue,
    @Qualifier("exchangeBackup") Exchange exchange
    ) {
    return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    }
    }
    
  • MyBackupConsumer
    package cn.cnyasin.rabbit.consumer;
    import cn.cnyasin.rabbit.config.MyBackupConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import java.util.Date;
    @Slf4j
    @Component
    public class MyBackupConsumer {
    /**
    * 正常隊列消費者
    *
    * @param message
    */
    @RabbitListener(queues = MyBackupConfig.QUEUE_NORMAL)
    public void queueNormalConsumer(String message) {
    log.info("[*] [{}] 正常隊列收到消息:{}", new Date().toString(), message);
    }
    /**
    * 備份隊列消費者
    *
    * @param message
    */
    @RabbitListener(queues = MyBackupConfig.QUEUE_BACKUP)
    public void queueBackupConsumer(String message) {
    log.info("[*] [{}] 備份隊列收到消息:{}", new Date().toString(), message);
    }
    }
    
  • MyBackupController
    package cn.cnyasin.rabbit.controller;
    import cn.cnyasin.rabbit.config.MyBackupConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import java.util.Date;
    @Slf4j
    @RestController
    @RequestMapping("/backup")
    public class MyBackupController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
    * 生產者
    *
    * @param msg
    * @return
    */
    @RequestMapping("/send/{msg}")
    public String send(@PathVariable String msg) {
    log.info("[*] [{}] 准備發送消息:{}", new Date().toString(), msg);
    // 發送到存在的路由key
    rabbitTemplate.convertAndSend(MyBackupConfig.EXCHANGE_NORMAL, MyBackupConfig.ROUTING_NORMAL, msg);
    // 發送到不存在的路由key
    rabbitTemplate.convertAndSend(MyBackupConfig.EXCHANGE_NORMAL, "qwe", msg);
    return "ok";
    }
    }
    

備注

  • 該教程部分內容收集自網絡,感謝原作者。

附錄

版权声明:本文为[csdn_yasin]所创,转载请带上原文链接,感谢。 https://gsmany.com/2022/01/202201071112066445.html