RabbitMQ消息路由失敗的處理方案(回調(diào)與備份交換機(jī)AE)
我們知道,消息在RabbitMQ的整個(gè)生命周期是生產(chǎn)者投遞消息到Exchange,Exchange根據(jù)路由鍵將消息路由到合適的Queue,Queue再將消息推(或消費(fèi)者主動(dòng)拉)給消費(fèi)者。
在這個(gè)過(guò)程當(dāng)中,Exchange根據(jù)路由鍵將消息路由到合適的Queue的過(guò)程,可能發(fā)生諸如
- Exchange沒(méi)有任何Queue與其綁定,
- 或者根據(jù)消息的路由鍵,沒(méi)有任何一個(gè)合適的Queue來(lái)投遞消息,
從而導(dǎo)致消息路由失敗。對(duì)于這些路由失敗的消息應(yīng)該如何處理呢?有兩種方式:
- 將消息返回給投遞該條消息的生產(chǎn)者。
- 使用備份交換機(jī) alternate-exchange(AE)。
方式1:將消息返回給投遞該條消息的生產(chǎn)者
- 配置
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=futao spring.rabbitmq.password=123456789 spring.rabbitmq.virtual-host=/tech-sharing # 當(dāng)exchange無(wú)法找到任何一個(gè)合適的queue時(shí),將消息return給生產(chǎn)者 spring.rabbitmq.template.mandatory=true # 必須設(shè)置為true,否則消息消息路由失敗也無(wú)法觸發(fā)Return回調(diào) spring.rabbitmq.publisher-returns=true
- 交換機(jī)定義與消息發(fā)送
@Slf4j @Component public class NoMatchQueue { /** * 交換機(jī)名稱 */ public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE"; @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void send() { log.info("發(fā)送消息"); Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus()); Message message = MessageBuilder .withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8)) .setContentEncoding(StandardCharsets.UTF_8.displayName()) .setContentType(MessageProperties.CONTENT_TYPE_JSON) .build(); rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message); } } @Configuration class ExchangeDeclare { /** * 只定義一個(gè)交換機(jī),但是不綁定任何Queue,所以發(fā)送到該Exchange的消息都會(huì)路由失敗 * * @return */ @Bean public Exchange noMatchQueueExchange() { return ExchangeBuilder .topicExchange(NoMatchQueue.EXCHANGE_NAME) .durable(true) .build(); } }
- 設(shè)置回調(diào)函數(shù)
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { log.error("消息被退回:{}", returnedMessage); } });
-
消息被退回:且可以看到原因是無(wú)法路由
方式2:使用備份交換機(jī)
使用方式1需要我們?cè)诔绦蛑羞M(jìn)行編碼設(shè)置回調(diào)函數(shù)監(jiān)聽(tīng),增加了生產(chǎn)者代碼的復(fù)雜性,那么為了消息不丟失還有沒(méi)有其他方式來(lái)處理路由失敗的消息呢:答案是使用備份交換機(jī)。
-
相較于使用回調(diào)函數(shù),使用備份交換機(jī)只需要給交換機(jī)綁定一個(gè)備份交換機(jī)即可,當(dāng)消息路由失敗之后,消息將投遞到備份交換機(jī),再由備份交換機(jī)路由消息到備份隊(duì)列。這樣我們只需要關(guān)注這個(gè)備份隊(duì)列就能知道/獲取到路由失敗的消息。通常情況下備份交換的Type應(yīng)該設(shè)置為
fanout。
-
配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=futao
spring.rabbitmq.password=123456789
spring.rabbitmq.virtual-host=/tech-sharing
# 當(dāng)exchange無(wú)法找到任何一個(gè)合適的queue時(shí),將消息return給生產(chǎn)者
spring.rabbitmq.template.mandatory=false
# 必須設(shè)置為true,否則消息消息路由失敗也無(wú)法觸發(fā)Return回調(diào)
spring.rabbitmq.publisher-returns=false
-
注意: 使用備份交換機(jī)模式,mandatory將無(wú)效,即就算mandatory設(shè)置為false,路由失敗的消息同樣會(huì)被投遞到綁定的備份交換機(jī)。
-
正常業(yè)務(wù)交換機(jī)(不綁定隊(duì)列,使得消息一定會(huì)路由失敗)
/**
* 業(yè)務(wù)交換機(jī)
*
* @return */ @Bean public Exchange noMatchQueueExchange() { return ExchangeBuilder
.topicExchange(NoMatchQueueAlternateExchange.EXCHANGE_NAME)
.durable(true) // 綁定備份交換機(jī) .alternate(X_ALTERNATE)
.build();
}
-
備份交換機(jī)/隊(duì)列/綁定
/**
* 備份隊(duì)列
*
* @return */ @Bean public Queue alternateQueue() { return QueueBuilder
.durable("Q_ALTERNATE")
.build();
} /**
* 備份交換機(jī)
*
* @return */ @Bean public Exchange alternateExchange() { return ExchangeBuilder
.fanoutExchange(X_ALTERNATE)
.durable(true)
.build();
} /**
* 備份綁定
*
* @param alternateExchange
* @param alternateQueue
* @return */ @Bean public Binding alternateBinding(Exchange alternateExchange, Queue alternateQueue) { return BindingBuilder
.bind(alternateQueue)
.to(alternateExchange)
.with("")
.noargs();
}
-
消息投遞
/**
* 正常業(yè)務(wù)交換機(jī)
*/ public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE_ALTERNATE"; @Autowired private RabbitTemplate rabbitTemplate; /**
* 發(fā)送消息
*/ @PostConstruct public void send() {
log.info("發(fā)送消息");
Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus());
Message message = MessageBuilder
.withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8))
.setContentEncoding(StandardCharsets.UTF_8.displayName())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.build();
rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
}
-
結(jié)果是消息被路由到備份交換機(jī)的備份隊(duì)列
-
且: 如果你同時(shí)使用了兩種方式,即(mandatory為true+Listener監(jiān)聽(tīng))和(備份交換機(jī)AlternateExchange),消息將只會(huì)路由到備份交換機(jī),不會(huì)Return回生產(chǎn)者。
# 在原生RabbitMQ-client中演示這一過(guò)程:
@Slf4j public class AeTest { /**
* 獲取Channel
*/ private static final Channel CHANNEL = MqChannelUtils.getChannel(); /**
* 備份交換機(jī)
*/ private static final String X_AE = "X_AE"; /**
* 備份交換機(jī)綁定的隊(duì)列
*/ private static final String Q_AE = "Q_AE"; /**
* 正常業(yè)務(wù)的交換機(jī)
*/ private static final String X_1 = "X_1"; public static void main(String[] args) throws IOException { // 定義備份交換機(jī)-其實(shí)也是一個(gè)正常的交換機(jī) CHANNEL.exchangeDeclare(X_AE, BuiltinExchangeType.FANOUT, true); // 定義備份隊(duì)列 CHANNEL.queueDeclare(Q_AE, true, false, false, null); // 綁定備份 CHANNEL.queueBind(Q_AE, X_AE, "");
HashMap arguments = new HashMap<>(); // 綁定的備份交換機(jī) arguments.put("alternate-exchange", X_AE); // 定義交換機(jī) CHANNEL.exchangeDeclare(X_1, BuiltinExchangeType.TOPIC, false, false, arguments); // 添加監(jiān)聽(tīng)器,看看是否還會(huì)return消息 CHANNEL.addReturnListener(new ReturnCallback() { @Override public void handle(Return returnMessage) {
log.error("消息被退回{}", returnMessage);
}
}); // 嘗試向交換機(jī)發(fā)送消息(無(wú)法路由)- mandatory參數(shù)無(wú)效 CHANNEL.basicPublish(X_1, "", false, false, new AMQP.BasicProperties(), "阿依古麗".getBytes(StandardCharsets.UTF_8));
}
}
-
兩個(gè)交換機(jī),正常的交換機(jī)X_1和備份交換機(jī)X_AE
-
備份交換機(jī)綁定的隊(duì)列已經(jīng)接收到了路由失敗的消息
-
其他要注意的點(diǎn):
-
備份交換機(jī)的Type設(shè)置為fanout比較合適,這樣可以忽略RoutingKey,避免備份交換機(jī)又路由失敗。
-
被投遞到備份交換機(jī)的RoutingKey為消息投遞到MQ時(shí)的原始RoutingKey,不會(huì)變,這一點(diǎn)在其他場(chǎng)景下也是一樣的。
-
使用備份交換機(jī)模式,mandatory將無(wú)效,即就算mandatory設(shè)置為false,路由失敗的消息同樣會(huì)被投遞到綁定的備份交換機(jī)。
# 源代碼
https://gitee.com/FutaoSmile/tech-sharing-mq
免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺(tái)僅提供信息存儲(chǔ)服務(wù)。文章僅代表作者個(gè)人觀點(diǎn),不代表本平臺(tái)立場(chǎng),如有問(wèn)題,請(qǐng)聯(lián)系我們,謝謝!