當(dāng)前位置:首頁 > 公眾號精選 > 架構(gòu)師社區(qū)
[導(dǎo)讀]我們知道,消息在RabbitMQ的整個生命周期是生產(chǎn)者投遞消息到Exchange,Exchange根據(jù)路由鍵將消息路由到合適的Queue,Queue再將消息推(或消費者主動拉)給消費者。

我們知道,消息在RabbitMQ的整個生命周期是生產(chǎn)者投遞消息Exchange,Exchange根據(jù)路由鍵消息路由到合適的QueueQueue再將消息推(或消費者主動拉)給消費者。

在這個過程當(dāng)中,Exchange根據(jù)路由鍵將消息路由到合適的Queue的過程,可能發(fā)生諸如

  1. Exchange沒有任何Queue與其綁定,
  2. 或者根據(jù)消息的路由鍵,沒有任何一個合適的Queue來投遞消息,

從而導(dǎo)致消息路由失敗。對于這些路由失敗的消息應(yīng)該如何處理呢?有兩種方式:

  1. 將消息返回給投遞該條消息的生產(chǎn)者。
  2. 使用備份交換機 alternate-exchange(AE)。

方式1:將消息返回給投遞該條消息的生產(chǎn)者

  • 配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=futao
spring.rabbitmq.password=123456789
spring.rabbitmq.virtual-host=/tech-sharing

# 當(dāng)exchange無法找到任何一個合適的queue時,將消息return給生產(chǎn)者
spring.rabbitmq.template.mandatory=true
# 必須設(shè)置為true,否則消息消息路由失敗也無法觸發(fā)Return回調(diào)
spring.rabbitmq.publisher-returns=true
  • 交換機定義與消息發(fā)送
@Slf4j @Component public class NoMatchQueue { /**
     * 交換機名稱
     */ public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE"; @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void send() {
        log.info("發(fā)送消息");
        Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus());
        Message message = MessageBuilder
                .withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8))
                .setContentEncoding(StandardCharsets.UTF_8.displayName())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .build();
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
    }
} @Configuration class ExchangeDeclare { /**
     * 只定義一個交換機,但是不綁定任何Queue,所以發(fā)送到該Exchange的消息都會路由失敗
     *
     * @return */ @Bean public Exchange noMatchQueueExchange() { return ExchangeBuilder
                .topicExchange(NoMatchQueue.EXCHANGE_NAME)
                .durable(true)
                .build();
    }
}
  • 設(shè)置回調(diào)函數(shù)
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) {
        log.error("消息被退回:{}", returnedMessage);
    }
});
  • 消息被退回:且可以看到原因是無法路由

RabbitMQ消息路由失敗的處理方案(回調(diào)與備份交換機AE)


方式2:使用備份交換機

使用方式1需要我們在程序中進行編碼設(shè)置回調(diào)函數(shù)監(jiān)聽,增加了生產(chǎn)者代碼的復(fù)雜性,那么為了消息不丟失還有沒有其他方式來處理路由失敗的消息呢:答案是使用備份交換機。

  • 相較于使用回調(diào)函數(shù),使用備份交換機只需要給交換機綁定一個備份交換機即可,當(dāng)消息路由失敗之后,消息將投遞到備份交換機,再由備份交換機路由消息到備份隊列。這樣我們只需要關(guān)注這個備份隊列就能知道/獲取到路由失敗的消息。通常情況下備份交換的Type應(yīng)該設(shè)置為 fanout。
  • 配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=futao
spring.rabbitmq.password=123456789
spring.rabbitmq.virtual-host=/tech-sharing

# 當(dāng)exchange無法找到任何一個合適的queue時,將消息return給生產(chǎn)者
spring.rabbitmq.template.mandatory=false
# 必須設(shè)置為true,否則消息消息路由失敗也無法觸發(fā)Return回調(diào)
spring.rabbitmq.publisher-returns=false
  • 注意: 使用備份交換機模式,mandatory將無效,即就算mandatory設(shè)置為false,路由失敗的消息同樣會被投遞到綁定的備份交換機。
  • 正常業(yè)務(wù)交換機(不綁定隊列,使得消息一定會路由失敗)
/**
 * 業(yè)務(wù)交換機
 *
 * @return */ @Bean public Exchange noMatchQueueExchange() { return ExchangeBuilder
            .topicExchange(NoMatchQueueAlternateExchange.EXCHANGE_NAME)
            .durable(true) // 綁定備份交換機 .alternate(X_ALTERNATE)
            .build();
}
  • 備份交換機/隊列/綁定
/**
 * 備份隊列
 *
 * @return */ @Bean public Queue alternateQueue() { return QueueBuilder
            .durable("Q_ALTERNATE")
            .build();
} /**
 * 備份交換機
 *
 * @return */ @Bean public Exchange alternateExchange() { return ExchangeBuilder
            .fanoutExchange(X_ALTERNATE)
            .durable(true)
            .build();
} /**
 * 備份綁定
 *
 * @param alternateExchange
 * @param alternateQueue
 * @return */ @Bean public Binding alternateBinding(Exchange alternateExchange, Queue alternateQueue) { return BindingBuilder
            .bind(alternateQueue)
            .to(alternateExchange)
            .with("")
            .noargs();
}
  • 消息投遞
/**
 * 正常業(yè)務(wù)交換機
 */ public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE_ALTERNATE"; @Autowired private RabbitTemplate rabbitTemplate; /**
 * 發(fā)送消息
 */ @PostConstruct public void send() {
    log.info("發(fā)送消息");
    Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus());
    Message message = MessageBuilder
            .withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8))
            .setContentEncoding(StandardCharsets.UTF_8.displayName())
            .setContentType(MessageProperties.CONTENT_TYPE_JSON)
            .build();
    rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
}
  • 結(jié)果是消息被路由到備份交換機的備份隊列

RabbitMQ消息路由失敗的處理方案(回調(diào)與備份交換機AE)

  • 且: 如果你同時使用了兩種方式,即(mandatory為true+Listener監(jiān)聽)和(備份交換機AlternateExchange),消息將只會路由到備份交換機,不會Return回生產(chǎn)者。


# 在原生RabbitMQ-client中演示這一過程:
@Slf4j public class AeTest { /**
     * 獲取Channel
     */ private static final Channel CHANNEL = MqChannelUtils.getChannel(); /**
     * 備份交換機
     */ private static final String X_AE = "X_AE"; /**
     * 備份交換機綁定的隊列
     */ private static final String Q_AE = "Q_AE"; /**
     * 正常業(yè)務(wù)的交換機
     */ private static final String X_1 = "X_1"; public static void main(String[] args) throws IOException { // 定義備份交換機-其實也是一個正常的交換機 CHANNEL.exchangeDeclare(X_AE, BuiltinExchangeType.FANOUT, true); // 定義備份隊列 CHANNEL.queueDeclare(Q_AE, true, false, false, null); // 綁定備份 CHANNEL.queueBind(Q_AE, X_AE, "");

        HashMap arguments = new HashMap<>(); // 綁定的備份交換機 arguments.put("alternate-exchange", X_AE); // 定義交換機 CHANNEL.exchangeDeclare(X_1, BuiltinExchangeType.TOPIC, false, false, arguments); // 添加監(jiān)聽器,看看是否還會return消息 CHANNEL.addReturnListener(new ReturnCallback() { @Override public void handle(Return returnMessage) {
                log.error("消息被退回{}", returnMessage);
            }
        }); // 嘗試向交換機發(fā)送消息(無法路由)- mandatory參數(shù)無效 CHANNEL.basicPublish(X_1, "", false, false, new AMQP.BasicProperties(), "阿依古麗".getBytes(StandardCharsets.UTF_8));
    }
}
  • 兩個交換機,正常的交換機X_1和備份交換機X_AE

RabbitMQ消息路由失敗的處理方案(回調(diào)與備份交換機AE)

RabbitMQ消息路由失敗的處理方案(回調(diào)與備份交換機AE)

RabbitMQ消息路由失敗的處理方案(回調(diào)與備份交換機AE)

  • 備份交換機綁定的隊列已經(jīng)接收到了路由失敗的消息

RabbitMQ消息路由失敗的處理方案(回調(diào)與備份交換機AE)

  • 其他要注意的點:

    • 備份交換機的Type設(shè)置為fanout比較合適,這樣可以忽略RoutingKey,避免備份交換機又路由失敗。
    • 被投遞到備份交換機的RoutingKey為消息投遞到MQ時的原始RoutingKey,不會變,這一點在其他場景下也是一樣的。
    • 使用備份交換機模式,mandatory將無效,即就算mandatory設(shè)置為false,路由失敗的消息同樣會被投遞到綁定的備份交換機。

# 源代碼

https://gitee.com/FutaoSmile/tech-sharing-mq


免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺僅提供信息存儲服務(wù)。文章僅代表作者個人觀點,不代表本平臺立場,如有問題,請聯(lián)系我們,謝謝!

本站聲明: 本文章由作者或相關(guān)機構(gòu)授權(quán)發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點,本站亦不保證或承諾內(nèi)容真實性等。需要轉(zhuǎn)載請聯(lián)系該專欄作者,如若文章內(nèi)容侵犯您的權(quán)益,請及時聯(lián)系本站刪除。
換一批
延伸閱讀

9月2日消息,不造車的華為或?qū)⒋呱龈蟮莫毥谦F公司,隨著阿維塔和賽力斯的入局,華為引望愈發(fā)顯得引人矚目。

關(guān)鍵字: 阿維塔 塞力斯 華為

加利福尼亞州圣克拉拉縣2024年8月30日 /美通社/ -- 數(shù)字化轉(zhuǎn)型技術(shù)解決方案公司Trianz今天宣布,該公司與Amazon Web Services (AWS)簽訂了...

關(guān)鍵字: AWS AN BSP 數(shù)字化

倫敦2024年8月29日 /美通社/ -- 英國汽車技術(shù)公司SODA.Auto推出其旗艦產(chǎn)品SODA V,這是全球首款涵蓋汽車工程師從創(chuàng)意到認(rèn)證的所有需求的工具,可用于創(chuàng)建軟件定義汽車。 SODA V工具的開發(fā)耗時1.5...

關(guān)鍵字: 汽車 人工智能 智能驅(qū)動 BSP

北京2024年8月28日 /美通社/ -- 越來越多用戶希望企業(yè)業(yè)務(wù)能7×24不間斷運行,同時企業(yè)卻面臨越來越多業(yè)務(wù)中斷的風(fēng)險,如企業(yè)系統(tǒng)復(fù)雜性的增加,頻繁的功能更新和發(fā)布等。如何確保業(yè)務(wù)連續(xù)性,提升韌性,成...

關(guān)鍵字: 亞馬遜 解密 控制平面 BSP

8月30日消息,據(jù)媒體報道,騰訊和網(wǎng)易近期正在縮減他們對日本游戲市場的投資。

關(guān)鍵字: 騰訊 編碼器 CPU

8月28日消息,今天上午,2024中國國際大數(shù)據(jù)產(chǎn)業(yè)博覽會開幕式在貴陽舉行,華為董事、質(zhì)量流程IT總裁陶景文發(fā)表了演講。

關(guān)鍵字: 華為 12nm EDA 半導(dǎo)體

8月28日消息,在2024中國國際大數(shù)據(jù)產(chǎn)業(yè)博覽會上,華為常務(wù)董事、華為云CEO張平安發(fā)表演講稱,數(shù)字世界的話語權(quán)最終是由生態(tài)的繁榮決定的。

關(guān)鍵字: 華為 12nm 手機 衛(wèi)星通信

要點: 有效應(yīng)對環(huán)境變化,經(jīng)營業(yè)績穩(wěn)中有升 落實提質(zhì)增效舉措,毛利潤率延續(xù)升勢 戰(zhàn)略布局成效顯著,戰(zhàn)新業(yè)務(wù)引領(lǐng)增長 以科技創(chuàng)新為引領(lǐng),提升企業(yè)核心競爭力 堅持高質(zhì)量發(fā)展策略,塑強核心競爭優(yōu)勢...

關(guān)鍵字: 通信 BSP 電信運營商 數(shù)字經(jīng)濟

北京2024年8月27日 /美通社/ -- 8月21日,由中央廣播電視總臺與中國電影電視技術(shù)學(xué)會聯(lián)合牽頭組建的NVI技術(shù)創(chuàng)新聯(lián)盟在BIRTV2024超高清全產(chǎn)業(yè)鏈發(fā)展研討會上宣布正式成立。 活動現(xiàn)場 NVI技術(shù)創(chuàng)新聯(lián)...

關(guān)鍵字: VI 傳輸協(xié)議 音頻 BSP

北京2024年8月27日 /美通社/ -- 在8月23日舉辦的2024年長三角生態(tài)綠色一體化發(fā)展示范區(qū)聯(lián)合招商會上,軟通動力信息技術(shù)(集團)股份有限公司(以下簡稱"軟通動力")與長三角投資(上海)有限...

關(guān)鍵字: BSP 信息技術(shù)
關(guān)閉
關(guān)閉