文章目录
  1. 1. RabbitMQ多源配置及ack相关使用
    1. 1.0.0.1. server端行为
      1. 1.0.0.1.1. server端行为
    2. 1.0.0.2. 消息丢失的风险
  • 1.1. 对比
  • RabbitMQ多源配置及ack相关使用

    pom.xml:

    <!--RabbitMQ依赖-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    RabbitMQConfig.java:

    
    import com.alibaba.fastjson.JSON;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Primary;
    
    @Slf4j
    @Configuration
    public class RabbitMQConfig {
    
        // mq主连接
        @Bean(name = "myConnectionFactory")
        @Primary
        public CachingConnectionFactory publicConnectionFactory(
                @Value("${v1.my.spring.rabbitmq.host}") String host,
                @Value("${v1.my.spring.rabbitmq.port}") int port,
                @Value("${v1.my.spring.rabbitmq.username}") String username,
                @Value("${v1.my.spring.rabbitmq.password}") String password,
                @Value("${v1.my.spring.rabbitmq.virtual-host}") String virtualHost,
                @Value("${v1.my.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,
                @Value("${v1.my.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setHost(host);
            connectionFactory.setPort(port);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            connectionFactory.setVirtualHost(virtualHost);
            connectionFactory.setPublisherConfirms(publisherConfirms);
            connectionFactory.setPublisherReturns(publisherReturns);
            return connectionFactory;
        }
    
        @Bean(name = "myRabbitTemplate")
        @Primary
        public RabbitTemplate publicRabbitTemplate(
                @Qualifier("myConnectionFactory") ConnectionFactory connectionFactory,
                @Value("${v1.my.spring.rabbitmq.template.mandatory}") Boolean mandatory) {
            RabbitTemplate myRabbitTemplate = new RabbitTemplate(connectionFactory);
            myRabbitTemplate.setMandatory(mandatory);
            myRabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {
                if (!ack) {
                        log.info("{} 发送RabbitMQ消息 ack确认 失败: [{}]", s, JSON.toJSONString(correlationData));
                } else {
                        log.info("{} 发送RabbitMQ消息 ack确认 成功: [{}]", s, JSON.toJSONString(correlationData));
                }
            });
            myRabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> {
    //                LOGGER.error("{} 发送RabbitMQ消息returnedMessage,出现异常,Exchange不存在或发送至Exchange却没有发送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});
            });
            return myRabbitTemplate;
        }
    
        @Bean(name = "myContainerFactory")
        @Primary
        public SimpleRabbitListenerContainerFactory insMessageListenerContainer(
                @Qualifier("myConnectionFactory") ConnectionFactory connectionFactory,
                @Value("${v1.my.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,
                @Value("${v1.my.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
            factory.setPrefetchCount(prefetch);
            return factory;
        }
    
        @Bean(name = "myRabbitAdmin")
        @Primary
        public RabbitAdmin publicRabbitAdmin(
                @Qualifier("myConnectionFactory") ConnectionFactory connectionFactory) {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
        
    
        //如果有多mq源,则把下面代码打开
        
    //    @Bean(name = "v2ConnectionFactory")
    //    public CachingConnectionFactory hospSyncConnectionFactory(
    //            @Value("${v2.spring.rabbitmq.host}") String host,
    //            @Value("${v2.spring.rabbitmq.port}") int port,
    //            @Value("${v2.spring.rabbitmq.username}") String username,
    //            @Value("${v2.spring.rabbitmq.password}") String password,
    //            @Value("${v2.spring.rabbitmq.virtual-host}") String virtualHost,
    //            @Value("${v2.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,
    //            @Value("${v2.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {
    //        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    //        connectionFactory.setHost(host);
    //        connectionFactory.setPort(port);
    //        connectionFactory.setUsername(username);
    //        connectionFactory.setPassword(password);
    //        connectionFactory.setVirtualHost(virtualHost);
    //        connectionFactory.setPublisherConfirms(publisherConfirms);
    //        connectionFactory.setPublisherReturns(publisherReturns);
    //        return connectionFactory;
    //    }
    //
    //    @Bean(name = "v2RabbitTemplate")
    //    public RabbitTemplate firstRabbitTemplate(
    //            @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,
    //            @Value("${v2.spring.rabbitmq.template.mandatory}") Boolean mandatory) {
    //        RabbitTemplate v2RabbitTemplate = new RabbitTemplate(connectionFactory);
    //        v2RabbitTemplate.setMandatory(mandatory);
    //        v2RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {
    //            if (!ack) {
    ////                    LOGGER.info("{} 发送RabbitMQ消息 ack确认 失败: [{}]", this.name, JSON.toJSONString(object));
    //            } else {
    ////                    LOGGER.info("{} 发送RabbitMQ消息 ack确认 成功: [{}]", this.name, JSON.toJSONString(object));
    //            }
    //        });
    //        v2RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> {
    ////                LOGGER.error("{} 发送RabbitMQ消息returnedMessage,出现异常,Exchange不存在或发送至Exchange却没有发送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});
    //        });
    //        return v2RabbitTemplate;
    //    }
    //
    //    @Bean(name = "v2ContainerFactory")
    //    public SimpleRabbitListenerContainerFactory hospSyncFactory(
    //            @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,
    //            @Value("${v2.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,
    //            @Value("${v2.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch
    //    ) {
    //        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    //        factory.setConnectionFactory(connectionFactory);
    //        factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
    //        factory.setPrefetchCount(prefetch);
    //        return factory;
    //    }
    //
    //    @Bean(name = "v2RabbitAdmin")
    //    public RabbitAdmin iqianzhanRabbitAdmin(
    //            @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory) {
    //        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    //        rabbitAdmin.setAutoStartup(true);
    //        return rabbitAdmin;
    //    }
    
    
        
    
    }
    

    application.yaml:

    v1:	
        my:
              spring:
            rabbitmq:
              host: xxx.com
              port: 5672
              username: xxx
              password: xxx
              virtual-host: xxx
              publisher-confirms: true
              publisher-returns: true
              listener:
                simple:
                  acknowledge-mode: auto
                  prefetch: 15
              template:
                mandatory: false
    

    acknowledge-mode说明

    1, 有ack模式(AcknowledgeMode.AUTO,AcknowledgeMode.MANUAL)

    • AcknowledgeMode.AUTO模式下,由spring-rabbit依据消息处理逻辑是否抛出异常自动发送ack(无异常)或nack(异常)到server端。

    自动确认ack,消费者代码参考:

    @RabbitHandler
        public void consumer(String message){
            try {
                JSONObject messageJSON = JSON.parseObject(message);
                EventIdentity eventId = EventIdentity.valueOf(messageJSON.getString("eventId"));
                BaseEventMessageVO<?> eventMessageVO = eventId.parseJSONToVO(message);
                if (eventMessageVO != null)
                    onMessage(eventMessageVO);
                else
                    log.debug("解析message失败,无法取得合适的对象!!,message:{}",message);
            } catch (Exception e) {
                //捕获异常,保证ack消息正常确认,防止出现消息循环消费并堵塞队列
                log.error("queens xxx 消费失败!!,message:{}",message,e);
            }
    
        }
    
    • AcknowledgeMode.MANUAL模式需要人为地获取到channel之后调用方法向server发送ack(或消费失败时的nack)信息。

    手动确认ack,消费者代码参考:

    public void receive(Message message, Channel channel) throws IOException {
            log.info("BaseIMReceiver.receive,mq头信息:{}", message.getMessageProperties());
            log.info("收到的mq消息:{}", message.getBody());
            IMMQMessage immqMessage = null;
            String msg = null;
    
            try {
                String body = new String(message.getBody(), "utf-8");
                immqMessage = JSON.parseObject(body, IMMQMessage.class);
                msg = immqMessage.getBody();
                log.info("接收转换的body消息:{}", msg);
            } catch (UnsupportedEncodingException e) {
                log.error("消息格式不正确,error msg:{}", message.getBody());
            }
    
            if (!StringUtils.hasText(msg)) {
                //ack返回true,告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                log.warn("转换mq的message出问题,将错误格式的消息从队列中清除!!,message:{}", message.getBody());
                return;
            }
    
            boolean ack = true;
            Exception exception = null;
            if (immqMessage.getRetrySize() < 3) {
                try {
                    processing(msg);
                    ack = true;
                } catch (Exception e) {
                    ack = false;
                    exception = e;
                }
            } else {
                log.warn("此消息重试3次仍然请求失败!!,message:{}", msg);
                try {
                    saveRetryFailMessage(msg);
                } catch (Exception e) {
                    log.error("保存重试3次仍然失败的消息出错!!,message:{}", msg, e);
                }
            }
    
            if (!ack) {
                log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            } else {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        }
    

    server端行为

    • rabbitmq server推送给每个channel的消息数量有限制,会保证每个channel没有收到ack的消息数量不会超过prefetchCount。
    • server端会暂存没有收到ack的消息,等消费端ack后才会丢掉;如果收到消费端的nack(消费失败的标识)或connection断开没收到反馈,会将消息放回到原队列头部。

    这种模式不会丢消息,但效率较低,因为server端需要等收到消费端的答复之后才会继续推送消息,当然,推送消息和等待答复是异步的,可适当增大prefetchCount提高效率。

    注意,有ack的模式下,需要考虑setDefaultRequeueRejected(false),否则当消费消息抛出异常没有catch住时,这条消息会被rabbitmq放回到queue头部,再被推送过来,然后再抛异常再放回…死循环了。设置false的作用是抛异常时不放回,而是直接丢弃,所以可能需要对这条消息做处理,以免丢失。更详细的配置参考这里

    2, 无ack模式(AcknowledgeMode.NONE)

    server端行为
    • rabbitmq server默认推送的所有消息都认为已经消费成功,会无脑的向消费端推送消息。
    • 因为rabbitmq server认为推送的消息已被成功消费,所以推送出去的消息不会暂存在server端。

    消息丢失的风险

    当BlockingQueue堆满时(BlockingQueue一定会先满),server端推送消息会失败,然后断开connection。消费端从Socket读取Frame将会抛出SocketException,触发异常处理,shutdown掉connection和所有的channel,channel shutdown后WorkPool中的channel信息(包括channel inProgress,channel ready以及Map)全部清空,所以BlockingQueue中的数据会全部丢失。

    此外,服务重启时也需对内存中未处理完的消息做必要的处理,以免丢失。

    而在rabbitmq server,connection断掉后就没有消费者去消费这个queue,因此在server端会看到消息堆积的现象。

    对比

    • ack模式:效率高,存在丢失大量消息的风险。
    • ack模式:效率低,不会丢消息。

    参考文章

    源代码

    文章目录
    1. 1. RabbitMQ多源配置及ack相关使用
      1. 1.0.0.1. server端行为
        1. 1.0.0.1.1. server端行为
      2. 1.0.0.2. 消息丢失的风险
  • 1.1. 对比