RabbitMQ多源配置及ack相关使用
RabbitMQ多源配置及ack相关使用
pom.xml:
<!--RabbitMQ依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
RabbitMQConfig.java:
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Slf4j
@Configuration
public class RabbitMQConfig {
    // mq主连接
    @Bean(name = "myConnectionFactory")
    @Primary
    public CachingConnectionFactory publicConnectionFactory(
            @Value("${v1.my.spring.rabbitmq.host}") String host,
            @Value("${v1.my.spring.rabbitmq.port}") int port,
            @Value("${v1.my.spring.rabbitmq.username}") String username,
            @Value("${v1.my.spring.rabbitmq.password}") String password,
            @Value("${v1.my.spring.rabbitmq.virtual-host}") String virtualHost,
            @Value("${v1.my.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,
            @Value("${v1.my.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(publisherConfirms);
        connectionFactory.setPublisherReturns(publisherReturns);
        return connectionFactory;
    }
    @Bean(name = "myRabbitTemplate")
    @Primary
    public RabbitTemplate publicRabbitTemplate(
            @Qualifier("myConnectionFactory") ConnectionFactory connectionFactory,
            @Value("${v1.my.spring.rabbitmq.template.mandatory}") Boolean mandatory) {
        RabbitTemplate myRabbitTemplate = new RabbitTemplate(connectionFactory);
        myRabbitTemplate.setMandatory(mandatory);
        myRabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {
            if (!ack) {
                    log.info("{} 发送RabbitMQ消息 ack确认 失败: [{}]", s, JSON.toJSONString(correlationData));
            } else {
                    log.info("{} 发送RabbitMQ消息 ack确认 成功: [{}]", s, JSON.toJSONString(correlationData));
            }
        });
        myRabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> {
//                LOGGER.error("{} 发送RabbitMQ消息returnedMessage,出现异常,Exchange不存在或发送至Exchange却没有发送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});
        });
        return myRabbitTemplate;
    }
    @Bean(name = "myContainerFactory")
    @Primary
    public SimpleRabbitListenerContainerFactory insMessageListenerContainer(
            @Qualifier("myConnectionFactory") ConnectionFactory connectionFactory,
            @Value("${v1.my.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,
            @Value("${v1.my.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
        factory.setPrefetchCount(prefetch);
        return factory;
    }
    @Bean(name = "myRabbitAdmin")
    @Primary
    public RabbitAdmin publicRabbitAdmin(
            @Qualifier("myConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
    
    //如果有多mq源,则把下面代码打开
    
//    @Bean(name = "v2ConnectionFactory")
//    public CachingConnectionFactory hospSyncConnectionFactory(
//            @Value("${v2.spring.rabbitmq.host}") String host,
//            @Value("${v2.spring.rabbitmq.port}") int port,
//            @Value("${v2.spring.rabbitmq.username}") String username,
//            @Value("${v2.spring.rabbitmq.password}") String password,
//            @Value("${v2.spring.rabbitmq.virtual-host}") String virtualHost,
//            @Value("${v2.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,
//            @Value("${v2.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {
//        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
//        connectionFactory.setHost(host);
//        connectionFactory.setPort(port);
//        connectionFactory.setUsername(username);
//        connectionFactory.setPassword(password);
//        connectionFactory.setVirtualHost(virtualHost);
//        connectionFactory.setPublisherConfirms(publisherConfirms);
//        connectionFactory.setPublisherReturns(publisherReturns);
//        return connectionFactory;
//    }
//
//    @Bean(name = "v2RabbitTemplate")
//    public RabbitTemplate firstRabbitTemplate(
//            @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,
//            @Value("${v2.spring.rabbitmq.template.mandatory}") Boolean mandatory) {
//        RabbitTemplate v2RabbitTemplate = new RabbitTemplate(connectionFactory);
//        v2RabbitTemplate.setMandatory(mandatory);
//        v2RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {
//            if (!ack) {
////                    LOGGER.info("{} 发送RabbitMQ消息 ack确认 失败: [{}]", this.name, JSON.toJSONString(object));
//            } else {
////                    LOGGER.info("{} 发送RabbitMQ消息 ack确认 成功: [{}]", this.name, JSON.toJSONString(object));
//            }
//        });
//        v2RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> {
////                LOGGER.error("{} 发送RabbitMQ消息returnedMessage,出现异常,Exchange不存在或发送至Exchange却没有发送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});
//        });
//        return v2RabbitTemplate;
//    }
//
//    @Bean(name = "v2ContainerFactory")
//    public SimpleRabbitListenerContainerFactory hospSyncFactory(
//            @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,
//            @Value("${v2.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,
//            @Value("${v2.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch
//    ) {
//        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//        factory.setConnectionFactory(connectionFactory);
//        factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
//        factory.setPrefetchCount(prefetch);
//        return factory;
//    }
//
//    @Bean(name = "v2RabbitAdmin")
//    public RabbitAdmin iqianzhanRabbitAdmin(
//            @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory) {
//        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
//        rabbitAdmin.setAutoStartup(true);
//        return rabbitAdmin;
//    }
    
}
application.yaml:
v1:	
    my:
          spring:
        rabbitmq:
          host: xxx.com
          port: 5672
          username: xxx
          password: xxx
          virtual-host: xxx
          publisher-confirms: true
          publisher-returns: true
          listener:
            simple:
              acknowledge-mode: auto
              prefetch: 15
          template:
            mandatory: false
acknowledge-mode说明
1, 有ack模式(AcknowledgeMode.AUTO,AcknowledgeMode.MANUAL)
AcknowledgeMode.AUTO模式下,由spring-rabbit依据消息处理逻辑是否抛出异常自动发送ack(无异常)或nack(异常)到server端。
自动确认ack,消费者代码参考:
@RabbitHandler
    public void consumer(String message){
        try {
            JSONObject messageJSON = JSON.parseObject(message);
            EventIdentity eventId = EventIdentity.valueOf(messageJSON.getString("eventId"));
            BaseEventMessageVO<?> eventMessageVO = eventId.parseJSONToVO(message);
            if (eventMessageVO != null)
                onMessage(eventMessageVO);
            else
                log.debug("解析message失败,无法取得合适的对象!!,message:{}",message);
        } catch (Exception e) {
            //捕获异常,保证ack消息正常确认,防止出现消息循环消费并堵塞队列
            log.error("queens xxx 消费失败!!,message:{}",message,e);
        }
    }
AcknowledgeMode.MANUAL模式需要人为地获取到channel之后调用方法向server发送ack(或消费失败时的nack)信息。
手动确认ack,消费者代码参考:
public void receive(Message message, Channel channel) throws IOException {
        log.info("BaseIMReceiver.receive,mq头信息:{}", message.getMessageProperties());
        log.info("收到的mq消息:{}", message.getBody());
        IMMQMessage immqMessage = null;
        String msg = null;
        try {
            String body = new String(message.getBody(), "utf-8");
            immqMessage = JSON.parseObject(body, IMMQMessage.class);
            msg = immqMessage.getBody();
            log.info("接收转换的body消息:{}", msg);
        } catch (UnsupportedEncodingException e) {
            log.error("消息格式不正确,error msg:{}", message.getBody());
        }
        if (!StringUtils.hasText(msg)) {
            //ack返回true,告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            log.warn("转换mq的message出问题,将错误格式的消息从队列中清除!!,message:{}", message.getBody());
            return;
        }
        boolean ack = true;
        Exception exception = null;
        if (immqMessage.getRetrySize() < 3) {
            try {
                processing(msg);
                ack = true;
            } catch (Exception e) {
                ack = false;
                exception = e;
            }
        } else {
            log.warn("此消息重试3次仍然请求失败!!,message:{}", msg);
            try {
                saveRetryFailMessage(msg);
            } catch (Exception e) {
                log.error("保存重试3次仍然失败的消息出错!!,message:{}", msg, e);
            }
        }
        if (!ack) {
            log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } else {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
server端行为
- rabbitmq server推送给每个channel的消息数量有限制,会保证每个channel没有收到ack的消息数量不会超过prefetchCount。
 - server端会暂存没有收到ack的消息,等消费端ack后才会丢掉;如果收到消费端的nack(消费失败的标识)或connection断开没收到反馈,会将消息放回到原队列头部。
 
这种模式不会丢消息,但效率较低,因为server端需要等收到消费端的答复之后才会继续推送消息,当然,推送消息和等待答复是异步的,可适当增大prefetchCount提高效率。
注意,有ack的模式下,需要考虑setDefaultRequeueRejected(false),否则当消费消息抛出异常没有catch住时,这条消息会被rabbitmq放回到queue头部,再被推送过来,然后再抛异常再放回…死循环了。设置false的作用是抛异常时不放回,而是直接丢弃,所以可能需要对这条消息做处理,以免丢失。更详细的配置参考这里。
2, 无ack模式(AcknowledgeMode.NONE)
server端行为
rabbitmq server默认推送的所有消息都认为已经消费成功,会无脑的向消费端推送消息。- 因为
rabbitmq server认为推送的消息已被成功消费,所以推送出去的消息不会暂存在server端。 
消息丢失的风险
当BlockingQueue堆满时(BlockingQueue一定会先满),server端推送消息会失败,然后断开connection。消费端从Socket读取Frame将会抛出SocketException,触发异常处理,shutdown掉connection和所有的channel,channel shutdown后WorkPool中的channel信息(包括channel inProgress,channel ready以及Map)全部清空,所以BlockingQueue中的数据会全部丢失。
此外,服务重启时也需对内存中未处理完的消息做必要的处理,以免丢失。
而在rabbitmq server,connection断掉后就没有消费者去消费这个queue,因此在server端会看到消息堆积的现象。
对比
- 无
ack模式:效率高,存在丢失大量消息的风险。 - 有
ack模式:效率低,不会丢消息。 

