當(dāng)前位置:首頁(yè) > 公眾號(hào)精選 > 架構(gòu)師社區(qū)
[導(dǎo)讀]我們知道,消息在RabbitMQ的整個(gè)生命周期是生產(chǎn)者投遞消息到Exchange,Exchange根據(jù)路由鍵將消息路由到合適的Queue,Queue再將消息推(或消費(fèi)者主動(dòng)拉)給消費(fèi)者。

我們知道,消息在RabbitMQ的整個(gè)生命周期是生產(chǎn)者投遞消息Exchange,Exchange根據(jù)路由鍵消息路由到合適的Queue,Queue再將消息推(或消費(fèi)者主動(dòng)拉)給消費(fèi)者。

在這個(gè)過(guò)程當(dāng)中,Exchange根據(jù)路由鍵將消息路由到合適的Queue的過(guò)程,可能發(fā)生諸如

  1. Exchange沒(méi)有任何Queue與其綁定,
  2. 或者根據(jù)消息的路由鍵,沒(méi)有任何一個(gè)合適的Queue來(lái)投遞消息,

從而導(dǎo)致消息路由失敗。對(duì)于這些路由失敗的消息應(yīng)該如何處理呢?有兩種方式:

  1. 將消息返回給投遞該條消息的生產(chǎn)者。
  2. 使用備份交換機(jī) alternate-exchange(AE)。

方式1:將消息返回給投遞該條消息的生產(chǎn)者

  • 配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=futao
spring.rabbitmq.password=123456789
spring.rabbitmq.virtual-host=/tech-sharing

# 當(dāng)exchange無(wú)法找到任何一個(gè)合適的queue時(shí),將消息return給生產(chǎn)者
spring.rabbitmq.template.mandatory=true
# 必須設(shè)置為true,否則消息消息路由失敗也無(wú)法觸發(fā)Return回調(diào)
spring.rabbitmq.publisher-returns=true
  • 交換機(jī)定義與消息發(fā)送
@Slf4j @Component public class NoMatchQueue { /**
     * 交換機(jī)名稱
     */ public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE"; @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void send() {
        log.info("發(fā)送消息");
        Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus());
        Message message = MessageBuilder
                .withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8))
                .setContentEncoding(StandardCharsets.UTF_8.displayName())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .build();
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
    }
} @Configuration class ExchangeDeclare { /**
     * 只定義一個(gè)交換機(jī),但是不綁定任何Queue,所以發(fā)送到該Exchange的消息都會(huì)路由失敗
     *
     * @return */ @Bean public Exchange noMatchQueueExchange() { return ExchangeBuilder
                .topicExchange(NoMatchQueue.EXCHANGE_NAME)
                .durable(true)
                .build();
    }
}
  • 設(shè)置回調(diào)函數(shù)
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) {
        log.error("消息被退回:{}", returnedMessage);
    }
});
  • 消息被退回:且可以看到原因是無(wú)法路由

RabbitMQ消息路由失敗的處理方案(回調(diào)與備份交換機(jī)AE)


方式2:使用備份交換機(jī)

使用方式1需要我們?cè)诔绦蛑羞M(jìn)行編碼設(shè)置回調(diào)函數(shù)監(jiān)聽(tīng),增加了生產(chǎn)者代碼的復(fù)雜性,那么為了消息不丟失還有沒(méi)有其他方式來(lái)處理路由失敗的消息呢:答案是使用備份交換機(jī)

  • 相較于使用回調(diào)函數(shù),使用備份交換機(jī)只需要給交換機(jī)綁定一個(gè)備份交換機(jī)即可,當(dāng)消息路由失敗之后,消息將投遞到備份交換機(jī),再由備份交換機(jī)路由消息到備份隊(duì)列。這樣我們只需要關(guān)注這個(gè)備份隊(duì)列就能知道/獲取到路由失敗的消息。通常情況下備份交換的Type應(yīng)該設(shè)置為 fanout。
  • 配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=futao
spring.rabbitmq.password=123456789
spring.rabbitmq.virtual-host=/tech-sharing

# 當(dāng)exchange無(wú)法找到任何一個(gè)合適的queue時(shí),將消息return給生產(chǎn)者
spring.rabbitmq.template.mandatory=false
# 必須設(shè)置為true,否則消息消息路由失敗也無(wú)法觸發(fā)Return回調(diào)
spring.rabbitmq.publisher-returns=false
  • 注意: 使用備份交換機(jī)模式,mandatory將無(wú)效,即就算mandatory設(shè)置為false,路由失敗的消息同樣會(huì)被投遞到綁定的備份交換機(jī)。
  • 正常業(yè)務(wù)交換機(jī)(不綁定隊(duì)列,使得消息一定會(huì)路由失敗)
/**
 * 業(yè)務(wù)交換機(jī)
 *
 * @return */ @Bean public Exchange noMatchQueueExchange() { return ExchangeBuilder
            .topicExchange(NoMatchQueueAlternateExchange.EXCHANGE_NAME)
            .durable(true) // 綁定備份交換機(jī) .alternate(X_ALTERNATE)
            .build();
}
  • 備份交換機(jī)/隊(duì)列/綁定
/**
 * 備份隊(duì)列
 *
 * @return */ @Bean public Queue alternateQueue() { return QueueBuilder
            .durable("Q_ALTERNATE")
            .build();
} /**
 * 備份交換機(jī)
 *
 * @return */ @Bean public Exchange alternateExchange() { return ExchangeBuilder
            .fanoutExchange(X_ALTERNATE)
            .durable(true)
            .build();
} /**
 * 備份綁定
 *
 * @param alternateExchange
 * @param alternateQueue
 * @return */ @Bean public Binding alternateBinding(Exchange alternateExchange, Queue alternateQueue) { return BindingBuilder
            .bind(alternateQueue)
            .to(alternateExchange)
            .with("")
            .noargs();
}
  • 消息投遞
/**
 * 正常業(yè)務(wù)交換機(jī)
 */ public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE_ALTERNATE"; @Autowired private RabbitTemplate rabbitTemplate; /**
 * 發(fā)送消息
 */ @PostConstruct public void send() {
    log.info("發(fā)送消息");
    Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus());
    Message message = MessageBuilder
            .withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8))
            .setContentEncoding(StandardCharsets.UTF_8.displayName())
            .setContentType(MessageProperties.CONTENT_TYPE_JSON)
            .build();
    rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
}
  • 結(jié)果是消息被路由到備份交換機(jī)的備份隊(duì)列

RabbitMQ消息路由失敗的處理方案(回調(diào)與備份交換機(jī)AE)

  • 且: 如果你同時(shí)使用了兩種方式,即(mandatory為true+Listener監(jiān)聽(tīng))和(備份交換機(jī)AlternateExchange),消息將只會(huì)路由到備份交換機(jī),不會(huì)Return回生產(chǎn)者。


# 在原生RabbitMQ-client中演示這一過(guò)程:
@Slf4j public class AeTest { /**
     * 獲取Channel
     */ private static final Channel CHANNEL = MqChannelUtils.getChannel(); /**
     * 備份交換機(jī)
     */ private static final String X_AE = "X_AE"; /**
     * 備份交換機(jī)綁定的隊(duì)列
     */ private static final String Q_AE = "Q_AE"; /**
     * 正常業(yè)務(wù)的交換機(jī)
     */ private static final String X_1 = "X_1"; public static void main(String[] args) throws IOException { // 定義備份交換機(jī)-其實(shí)也是一個(gè)正常的交換機(jī) CHANNEL.exchangeDeclare(X_AE, BuiltinExchangeType.FANOUT, true); // 定義備份隊(duì)列 CHANNEL.queueDeclare(Q_AE, true, false, false, null); // 綁定備份 CHANNEL.queueBind(Q_AE, X_AE, "");

        HashMap arguments = new HashMap<>(); // 綁定的備份交換機(jī) arguments.put("alternate-exchange", X_AE); // 定義交換機(jī) CHANNEL.exchangeDeclare(X_1, BuiltinExchangeType.TOPIC, false, false, arguments); // 添加監(jiān)聽(tīng)器,看看是否還會(huì)return消息 CHANNEL.addReturnListener(new ReturnCallback() { @Override public void handle(Return returnMessage) {
                log.error("消息被退回{}", returnMessage);
            }
        }); // 嘗試向交換機(jī)發(fā)送消息(無(wú)法路由)- mandatory參數(shù)無(wú)效 CHANNEL.basicPublish(X_1, "", false, false, new AMQP.BasicProperties(), "阿依古麗".getBytes(StandardCharsets.UTF_8));
    }
}
  • 兩個(gè)交換機(jī),正常的交換機(jī)X_1和備份交換機(jī)X_AE

RabbitMQ消息路由失敗的處理方案(回調(diào)與備份交換機(jī)AE)

RabbitMQ消息路由失敗的處理方案(回調(diào)與備份交換機(jī)AE)

RabbitMQ消息路由失敗的處理方案(回調(diào)與備份交換機(jī)AE)

  • 備份交換機(jī)綁定的隊(duì)列已經(jīng)接收到了路由失敗的消息

RabbitMQ消息路由失敗的處理方案(回調(diào)與備份交換機(jī)AE)

  • 其他要注意的點(diǎn):

    • 備份交換機(jī)的Type設(shè)置為fanout比較合適,這樣可以忽略RoutingKey,避免備份交換機(jī)又路由失敗。
    • 被投遞到備份交換機(jī)的RoutingKey為消息投遞到MQ時(shí)的原始RoutingKey,不會(huì)變,這一點(diǎn)在其他場(chǎng)景下也是一樣的。
    • 使用備份交換機(jī)模式,mandatory將無(wú)效,即就算mandatory設(shè)置為false,路由失敗的消息同樣會(huì)被投遞到綁定的備份交換機(jī)。

# 源代碼

https://gitee.com/FutaoSmile/tech-sharing-mq


免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺(tái)僅提供信息存儲(chǔ)服務(wù)。文章僅代表作者個(gè)人觀點(diǎn),不代表本平臺(tái)立場(chǎng),如有問(wèn)題,請(qǐng)聯(lián)系我們,謝謝!

本站聲明: 本文章由作者或相關(guān)機(jī)構(gòu)授權(quán)發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點(diǎn),本站亦不保證或承諾內(nèi)容真實(shí)性等。需要轉(zhuǎn)載請(qǐng)聯(lián)系該專欄作者,如若文章內(nèi)容侵犯您的權(quán)益,請(qǐng)及時(shí)聯(lián)系本站刪除。
換一批
延伸閱讀

9月2日消息,不造車的華為或?qū)⒋呱龈蟮莫?dú)角獸公司,隨著阿維塔和賽力斯的入局,華為引望愈發(fā)顯得引人矚目。

關(guān)鍵字: 阿維塔 塞力斯 華為

倫敦2024年8月29日 /美通社/ -- 英國(guó)汽車技術(shù)公司SODA.Auto推出其旗艦產(chǎn)品SODA V,這是全球首款涵蓋汽車工程師從創(chuàng)意到認(rèn)證的所有需求的工具,可用于創(chuàng)建軟件定義汽車。 SODA V工具的開(kāi)發(fā)耗時(shí)1.5...

關(guān)鍵字: 汽車 人工智能 智能驅(qū)動(dòng) BSP

北京2024年8月28日 /美通社/ -- 越來(lái)越多用戶希望企業(yè)業(yè)務(wù)能7×24不間斷運(yùn)行,同時(shí)企業(yè)卻面臨越來(lái)越多業(yè)務(wù)中斷的風(fēng)險(xiǎn),如企業(yè)系統(tǒng)復(fù)雜性的增加,頻繁的功能更新和發(fā)布等。如何確保業(yè)務(wù)連續(xù)性,提升韌性,成...

關(guān)鍵字: 亞馬遜 解密 控制平面 BSP

8月30日消息,據(jù)媒體報(bào)道,騰訊和網(wǎng)易近期正在縮減他們對(duì)日本游戲市場(chǎng)的投資。

關(guān)鍵字: 騰訊 編碼器 CPU

8月28日消息,今天上午,2024中國(guó)國(guó)際大數(shù)據(jù)產(chǎn)業(yè)博覽會(huì)開(kāi)幕式在貴陽(yáng)舉行,華為董事、質(zhì)量流程IT總裁陶景文發(fā)表了演講。

關(guān)鍵字: 華為 12nm EDA 半導(dǎo)體

8月28日消息,在2024中國(guó)國(guó)際大數(shù)據(jù)產(chǎn)業(yè)博覽會(huì)上,華為常務(wù)董事、華為云CEO張平安發(fā)表演講稱,數(shù)字世界的話語(yǔ)權(quán)最終是由生態(tài)的繁榮決定的。

關(guān)鍵字: 華為 12nm 手機(jī) 衛(wèi)星通信

要點(diǎn): 有效應(yīng)對(duì)環(huán)境變化,經(jīng)營(yíng)業(yè)績(jī)穩(wěn)中有升 落實(shí)提質(zhì)增效舉措,毛利潤(rùn)率延續(xù)升勢(shì) 戰(zhàn)略布局成效顯著,戰(zhàn)新業(yè)務(wù)引領(lǐng)增長(zhǎng) 以科技創(chuàng)新為引領(lǐng),提升企業(yè)核心競(jìng)爭(zhēng)力 堅(jiān)持高質(zhì)量發(fā)展策略,塑強(qiáng)核心競(jìng)爭(zhēng)優(yōu)勢(shì)...

關(guān)鍵字: 通信 BSP 電信運(yùn)營(yíng)商 數(shù)字經(jīng)濟(jì)

北京2024年8月27日 /美通社/ -- 8月21日,由中央廣播電視總臺(tái)與中國(guó)電影電視技術(shù)學(xué)會(huì)聯(lián)合牽頭組建的NVI技術(shù)創(chuàng)新聯(lián)盟在BIRTV2024超高清全產(chǎn)業(yè)鏈發(fā)展研討會(huì)上宣布正式成立。 活動(dòng)現(xiàn)場(chǎng) NVI技術(shù)創(chuàng)新聯(lián)...

關(guān)鍵字: VI 傳輸協(xié)議 音頻 BSP

北京2024年8月27日 /美通社/ -- 在8月23日舉辦的2024年長(zhǎng)三角生態(tài)綠色一體化發(fā)展示范區(qū)聯(lián)合招商會(huì)上,軟通動(dòng)力信息技術(shù)(集團(tuán))股份有限公司(以下簡(jiǎn)稱"軟通動(dòng)力")與長(zhǎng)三角投資(上海)有限...

關(guān)鍵字: BSP 信息技術(shù)
關(guān)閉
關(guān)閉