RabbitMQ多源配置及ack相关使用
RabbitMQ多源配置及ack相关使用
pom.xml:
<!--RabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
RabbitMQConfig.java:
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Slf4j
@Configuration
public class RabbitMQConfig {
// mq主连接
@Bean(name = "myConnectionFactory")
@Primary
public CachingConnectionFactory publicConnectionFactory(
@Value("${v1.my.spring.rabbitmq.host}") String host,
@Value("${v1.my.spring.rabbitmq.port}") int port,
@Value("${v1.my.spring.rabbitmq.username}") String username,
@Value("${v1.my.spring.rabbitmq.password}") String password,
@Value("${v1.my.spring.rabbitmq.virtual-host}") String virtualHost,
@Value("${v1.my.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,
@Value("${v1.my.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(publisherConfirms);
connectionFactory.setPublisherReturns(publisherReturns);
return connectionFactory;
}
@Bean(name = "myRabbitTemplate")
@Primary
public RabbitTemplate publicRabbitTemplate(
@Qualifier("myConnectionFactory") ConnectionFactory connectionFactory,
@Value("${v1.my.spring.rabbitmq.template.mandatory}") Boolean mandatory) {
RabbitTemplate myRabbitTemplate = new RabbitTemplate(connectionFactory);
myRabbitTemplate.setMandatory(mandatory);
myRabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {
if (!ack) {
log.info("{} 发送RabbitMQ消息 ack确认 失败: [{}]", s, JSON.toJSONString(correlationData));
} else {
log.info("{} 发送RabbitMQ消息 ack确认 成功: [{}]", s, JSON.toJSONString(correlationData));
}
});
myRabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> {
// LOGGER.error("{} 发送RabbitMQ消息returnedMessage,出现异常,Exchange不存在或发送至Exchange却没有发送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});
});
return myRabbitTemplate;
}
@Bean(name = "myContainerFactory")
@Primary
public SimpleRabbitListenerContainerFactory insMessageListenerContainer(
@Qualifier("myConnectionFactory") ConnectionFactory connectionFactory,
@Value("${v1.my.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,
@Value("${v1.my.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
factory.setPrefetchCount(prefetch);
return factory;
}
@Bean(name = "myRabbitAdmin")
@Primary
public RabbitAdmin publicRabbitAdmin(
@Qualifier("myConnectionFactory") ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
//如果有多mq源,则把下面代码打开
// @Bean(name = "v2ConnectionFactory")
// public CachingConnectionFactory hospSyncConnectionFactory(
// @Value("${v2.spring.rabbitmq.host}") String host,
// @Value("${v2.spring.rabbitmq.port}") int port,
// @Value("${v2.spring.rabbitmq.username}") String username,
// @Value("${v2.spring.rabbitmq.password}") String password,
// @Value("${v2.spring.rabbitmq.virtual-host}") String virtualHost,
// @Value("${v2.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,
// @Value("${v2.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {
// CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
// connectionFactory.setHost(host);
// connectionFactory.setPort(port);
// connectionFactory.setUsername(username);
// connectionFactory.setPassword(password);
// connectionFactory.setVirtualHost(virtualHost);
// connectionFactory.setPublisherConfirms(publisherConfirms);
// connectionFactory.setPublisherReturns(publisherReturns);
// return connectionFactory;
// }
//
// @Bean(name = "v2RabbitTemplate")
// public RabbitTemplate firstRabbitTemplate(
// @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,
// @Value("${v2.spring.rabbitmq.template.mandatory}") Boolean mandatory) {
// RabbitTemplate v2RabbitTemplate = new RabbitTemplate(connectionFactory);
// v2RabbitTemplate.setMandatory(mandatory);
// v2RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {
// if (!ack) {
//// LOGGER.info("{} 发送RabbitMQ消息 ack确认 失败: [{}]", this.name, JSON.toJSONString(object));
// } else {
//// LOGGER.info("{} 发送RabbitMQ消息 ack确认 成功: [{}]", this.name, JSON.toJSONString(object));
// }
// });
// v2RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> {
//// LOGGER.error("{} 发送RabbitMQ消息returnedMessage,出现异常,Exchange不存在或发送至Exchange却没有发送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});
// });
// return v2RabbitTemplate;
// }
//
// @Bean(name = "v2ContainerFactory")
// public SimpleRabbitListenerContainerFactory hospSyncFactory(
// @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,
// @Value("${v2.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,
// @Value("${v2.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch
// ) {
// SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// factory.setConnectionFactory(connectionFactory);
// factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
// factory.setPrefetchCount(prefetch);
// return factory;
// }
//
// @Bean(name = "v2RabbitAdmin")
// public RabbitAdmin iqianzhanRabbitAdmin(
// @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory) {
// RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// rabbitAdmin.setAutoStartup(true);
// return rabbitAdmin;
// }
}
application.yaml:
v1:
my:
spring:
rabbitmq:
host: xxx.com
port: 5672
username: xxx
password: xxx
virtual-host: xxx
publisher-confirms: true
publisher-returns: true
listener:
simple:
acknowledge-mode: auto
prefetch: 15
template:
mandatory: false
acknowledge-mode说明
1, 有ack模式(AcknowledgeMode.AUTO,AcknowledgeMode.MANUAL)
AcknowledgeMode.AUTO
模式下,由spring-rabbit
依据消息处理逻辑是否抛出异常自动发送ack
(无异常)或nack
(异常)到server
端。
自动确认ack,消费者代码参考:
@RabbitHandler
public void consumer(String message){
try {
JSONObject messageJSON = JSON.parseObject(message);
EventIdentity eventId = EventIdentity.valueOf(messageJSON.getString("eventId"));
BaseEventMessageVO<?> eventMessageVO = eventId.parseJSONToVO(message);
if (eventMessageVO != null)
onMessage(eventMessageVO);
else
log.debug("解析message失败,无法取得合适的对象!!,message:{}",message);
} catch (Exception e) {
//捕获异常,保证ack消息正常确认,防止出现消息循环消费并堵塞队列
log.error("queens xxx 消费失败!!,message:{}",message,e);
}
}
AcknowledgeMode.MANUAL
模式需要人为地获取到channel
之后调用方法向server
发送ack
(或消费失败时的nack
)信息。
手动确认ack,消费者代码参考:
public void receive(Message message, Channel channel) throws IOException {
log.info("BaseIMReceiver.receive,mq头信息:{}", message.getMessageProperties());
log.info("收到的mq消息:{}", message.getBody());
IMMQMessage immqMessage = null;
String msg = null;
try {
String body = new String(message.getBody(), "utf-8");
immqMessage = JSON.parseObject(body, IMMQMessage.class);
msg = immqMessage.getBody();
log.info("接收转换的body消息:{}", msg);
} catch (UnsupportedEncodingException e) {
log.error("消息格式不正确,error msg:{}", message.getBody());
}
if (!StringUtils.hasText(msg)) {
//ack返回true,告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.warn("转换mq的message出问题,将错误格式的消息从队列中清除!!,message:{}", message.getBody());
return;
}
boolean ack = true;
Exception exception = null;
if (immqMessage.getRetrySize() < 3) {
try {
processing(msg);
ack = true;
} catch (Exception e) {
ack = false;
exception = e;
}
} else {
log.warn("此消息重试3次仍然请求失败!!,message:{}", msg);
try {
saveRetryFailMessage(msg);
} catch (Exception e) {
log.error("保存重试3次仍然失败的消息出错!!,message:{}", msg, e);
}
}
if (!ack) {
log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} else {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
server端行为
- rabbitmq server推送给每个channel的消息数量有限制,会保证每个channel没有收到ack的消息数量不会超过prefetchCount。
- server端会暂存没有收到ack的消息,等消费端ack后才会丢掉;如果收到消费端的nack(消费失败的标识)或connection断开没收到反馈,会将消息放回到原队列头部。
这种模式不会丢消息,但效率较低,因为server端需要等收到消费端的答复之后才会继续推送消息,当然,推送消息和等待答复是异步的,可适当增大prefetchCount提高效率。
注意,有ack的模式下,需要考虑setDefaultRequeueRejected(false),否则当消费消息抛出异常没有catch住时,这条消息会被rabbitmq放回到queue头部,再被推送过来,然后再抛异常再放回…死循环了。设置false的作用是抛异常时不放回,而是直接丢弃,所以可能需要对这条消息做处理,以免丢失。更详细的配置参考这里。
2, 无ack模式(AcknowledgeMode.NONE)
server
端行为
rabbitmq server
默认推送的所有消息都认为已经消费成功,会无脑的向消费端推送消息。- 因为
rabbitmq server
认为推送的消息已被成功消费,所以推送出去的消息不会暂存在server
端。
消息丢失的风险
当BlockingQueue堆满时(BlockingQueue一定会先满),server端推送消息会失败,然后断开connection。消费端从Socket读取Frame将会抛出SocketException,触发异常处理,shutdown掉connection和所有的channel,channel shutdown后WorkPool中的channel信息(包括channel inProgress,channel ready以及Map)全部清空,所以BlockingQueue中的数据会全部丢失。
此外,服务重启时也需对内存中未处理完的消息做必要的处理,以免丢失。
而在rabbitmq server,connection断掉后就没有消费者去消费这个queue,因此在server端会看到消息堆积的现象。
对比
- 无
ack
模式:效率高,存在丢失大量消息的风险。 - 有
ack
模式:效率低,不会丢消息。