RabbitMQ消息路由失敗的處理方案(回調(diào)與備份交換機AE)
我們知道,消息在RabbitMQ的整個生命周期是生產(chǎn)者投遞消息到Exchange,Exchange根據(jù)路由鍵將消息路由到合適的Queue,Queue再將消息推(或消費者主動拉)給消費者。
在這個過程當(dāng)中,Exchange根據(jù)路由鍵將消息路由到合適的Queue的過程,可能發(fā)生諸如
- Exchange沒有任何Queue與其綁定,
- 或者根據(jù)消息的路由鍵,沒有任何一個合適的Queue來投遞消息,
從而導(dǎo)致消息路由失敗。對于這些路由失敗的消息應(yīng)該如何處理呢?有兩種方式:
- 將消息返回給投遞該條消息的生產(chǎn)者。
- 使用備份交換機 alternate-exchange(AE)。
方式1:將消息返回給投遞該條消息的生產(chǎn)者
- 配置
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=futao spring.rabbitmq.password=123456789 spring.rabbitmq.virtual-host=/tech-sharing # 當(dāng)exchange無法找到任何一個合適的queue時,將消息return給生產(chǎn)者 spring.rabbitmq.template.mandatory=true # 必須設(shè)置為true,否則消息消息路由失敗也無法觸發(fā)Return回調(diào) spring.rabbitmq.publisher-returns=true
- 交換機定義與消息發(fā)送
@Slf4j @Component public class NoMatchQueue { /** * 交換機名稱 */ public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE"; @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void send() { log.info("發(fā)送消息"); Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus()); Message message = MessageBuilder .withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8)) .setContentEncoding(StandardCharsets.UTF_8.displayName()) .setContentType(MessageProperties.CONTENT_TYPE_JSON) .build(); rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message); } } @Configuration class ExchangeDeclare { /** * 只定義一個交換機,但是不綁定任何Queue,所以發(fā)送到該Exchange的消息都會路由失敗 * * @return */ @Bean public Exchange noMatchQueueExchange() { return ExchangeBuilder .topicExchange(NoMatchQueue.EXCHANGE_NAME) .durable(true) .build(); } }
- 設(shè)置回調(diào)函數(shù)
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { log.error("消息被退回:{}", returnedMessage); } });
-
消息被退回:且可以看到原因是無法路由
方式2:使用備份交換機
使用方式1需要我們在程序中進行編碼設(shè)置回調(diào)函數(shù)監(jiān)聽,增加了生產(chǎn)者代碼的復(fù)雜性,那么為了消息不丟失還有沒有其他方式來處理路由失敗的消息呢:答案是使用備份交換機。
-
相較于使用回調(diào)函數(shù),使用備份交換機只需要給交換機綁定一個備份交換機即可,當(dāng)消息路由失敗之后,消息將投遞到備份交換機,再由備份交換機路由消息到備份隊列。這樣我們只需要關(guān)注這個備份隊列就能知道/獲取到路由失敗的消息。通常情況下備份交換的Type應(yīng)該設(shè)置為
fanout。
-
配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=futao
spring.rabbitmq.password=123456789
spring.rabbitmq.virtual-host=/tech-sharing
# 當(dāng)exchange無法找到任何一個合適的queue時,將消息return給生產(chǎn)者
spring.rabbitmq.template.mandatory=false
# 必須設(shè)置為true,否則消息消息路由失敗也無法觸發(fā)Return回調(diào)
spring.rabbitmq.publisher-returns=false
-
注意: 使用備份交換機模式,mandatory將無效,即就算mandatory設(shè)置為false,路由失敗的消息同樣會被投遞到綁定的備份交換機。
-
正常業(yè)務(wù)交換機(不綁定隊列,使得消息一定會路由失敗)
/**
* 業(yè)務(wù)交換機
*
* @return */ @Bean public Exchange noMatchQueueExchange() { return ExchangeBuilder
.topicExchange(NoMatchQueueAlternateExchange.EXCHANGE_NAME)
.durable(true) // 綁定備份交換機 .alternate(X_ALTERNATE)
.build();
}
-
備份交換機/隊列/綁定
/**
* 備份隊列
*
* @return */ @Bean public Queue alternateQueue() { return QueueBuilder
.durable("Q_ALTERNATE")
.build();
} /**
* 備份交換機
*
* @return */ @Bean public Exchange alternateExchange() { return ExchangeBuilder
.fanoutExchange(X_ALTERNATE)
.durable(true)
.build();
} /**
* 備份綁定
*
* @param alternateExchange
* @param alternateQueue
* @return */ @Bean public Binding alternateBinding(Exchange alternateExchange, Queue alternateQueue) { return BindingBuilder
.bind(alternateQueue)
.to(alternateExchange)
.with("")
.noargs();
}
-
消息投遞
/**
* 正常業(yè)務(wù)交換機
*/ public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE_ALTERNATE"; @Autowired private RabbitTemplate rabbitTemplate; /**
* 發(fā)送消息
*/ @PostConstruct public void send() {
log.info("發(fā)送消息");
Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus());
Message message = MessageBuilder
.withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8))
.setContentEncoding(StandardCharsets.UTF_8.displayName())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.build();
rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
}
-
結(jié)果是消息被路由到備份交換機的備份隊列
-
且: 如果你同時使用了兩種方式,即(mandatory為true+Listener監(jiān)聽)和(備份交換機AlternateExchange),消息將只會路由到備份交換機,不會Return回生產(chǎn)者。
# 在原生RabbitMQ-client中演示這一過程:
@Slf4j public class AeTest { /**
* 獲取Channel
*/ private static final Channel CHANNEL = MqChannelUtils.getChannel(); /**
* 備份交換機
*/ private static final String X_AE = "X_AE"; /**
* 備份交換機綁定的隊列
*/ private static final String Q_AE = "Q_AE"; /**
* 正常業(yè)務(wù)的交換機
*/ private static final String X_1 = "X_1"; public static void main(String[] args) throws IOException { // 定義備份交換機-其實也是一個正常的交換機 CHANNEL.exchangeDeclare(X_AE, BuiltinExchangeType.FANOUT, true); // 定義備份隊列 CHANNEL.queueDeclare(Q_AE, true, false, false, null); // 綁定備份 CHANNEL.queueBind(Q_AE, X_AE, "");
HashMap arguments = new HashMap<>(); // 綁定的備份交換機 arguments.put("alternate-exchange", X_AE); // 定義交換機 CHANNEL.exchangeDeclare(X_1, BuiltinExchangeType.TOPIC, false, false, arguments); // 添加監(jiān)聽器,看看是否還會return消息 CHANNEL.addReturnListener(new ReturnCallback() { @Override public void handle(Return returnMessage) {
log.error("消息被退回{}", returnMessage);
}
}); // 嘗試向交換機發(fā)送消息(無法路由)- mandatory參數(shù)無效 CHANNEL.basicPublish(X_1, "", false, false, new AMQP.BasicProperties(), "阿依古麗".getBytes(StandardCharsets.UTF_8));
}
}
-
兩個交換機,正常的交換機X_1和備份交換機X_AE
-
備份交換機綁定的隊列已經(jīng)接收到了路由失敗的消息
-
其他要注意的點:
-
備份交換機的Type設(shè)置為fanout比較合適,這樣可以忽略RoutingKey,避免備份交換機又路由失敗。
-
被投遞到備份交換機的RoutingKey為消息投遞到MQ時的原始RoutingKey,不會變,這一點在其他場景下也是一樣的。
-
使用備份交換機模式,mandatory將無效,即就算mandatory設(shè)置為false,路由失敗的消息同樣會被投遞到綁定的備份交換機。
# 源代碼
https://gitee.com/FutaoSmile/tech-sharing-mq
免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺僅提供信息存儲服務(wù)。文章僅代表作者個人觀點,不代表本平臺立場,如有問題,請聯(lián)系我們,謝謝!