开发

如何通过SpringBoot + RabbitMQ保证消息100%投递成功并被消费?(附子系统) IT综合

一,先扔一张图

说明:

本文涵盖了关于RabbitMQ很多方面的知识点,例如:

  • 消息发送确认机制
  • 消费确认机制
  • 消息的重新投递
  • 消费幂等性,等等

这些都是围绕上面那张整体流程图展开的,所以有必要先贴出来,认为知意

二,实现思路

  • 简略介绍163邮箱授权码的获取
  • 编写发送邮件工具类
  • 编写RabbitMQ配置文件
  • 生产者发起呼吁
  • 消费者发送邮件
  • 定时任务定时拉取投递失败的消息,重新投递
  • 各种异常情况的测试验证

扩展:使用动态代理实现消费端幂等性验证和消息确认(ack)

三,项目介绍

  • springboot版本2.1.5.RELEASE,旧版本可能有些配置属性不能使用,需要以代码形式进行配置
  • RabbitMQ版本3.7.15
  • MailUtil:发送邮件工具类
  • RabbitConfig:Rabbitmq相关配置
  • TestServiceImpl:生产者,发送消息
  • MailConsumer:消费者,消费消息,发送邮件
  • ResendMsg:定时任务,重新投递发送失败的消息

说明:上面是核心代码,MsgLogService映射器xml等均未贴出,完整代码可以参考GitHub上的源码,地址在文末。

四,代码实现

1.163邮箱授权码的获取,如下:

该授权码就是配置文件spring.mail.password需要的密码

2.pom

        <dependency>
            <groupId>org.springframework.boot<groupId>
            <artifactId>spring-boot-starter-amqp<artifactId>
        <dependency>
        
        <dependency>
            <groupId>org.springframework.boot<groupId>
            <artifactId>spring-boot-starter-mail<artifactId>
        <dependency>

3.rabbitmq,邮箱配置

#RabbitMQ的
spring.rabbitmq.host =本地主机
spring.rabbitmq.port = 5672
spring.rabbitmq.username =来宾
spring.rabbitmq.password =来宾
#开启确认回调P - >交易所
spring.rabbitmq.publisher-确认= 真
#开启returnedMessage交换->队列spring.rabbitmq.publisher
-returns = true
#设置手动确认(ack)队列-> C
spring.rabbitmq.listener.simple.acknowledge-mode = manual
spring.rabbitmq.listener.simple.prefetch = 100

#邮件
spring.mail.host = smtp。163 .com
spring.mail.username = 18621142249 @ 163.com
spring.mail.password = 123456 wangzai
spring.mail.from = 18621142249 @ 163.com
spring.mail.properties.mail.smtp.auth = true
spring.mail.properties.mail.smtp.starttls.enable = true
spring.mail.properties.mail.smtp.starttls .required = true

说明:password即授权码,用户名和来自要一致

4.表结构

CREATE  TABLE  msg_log
msg_id VARCHAR(255) NOT NULL DEFAULT '' COMMENT '消息唯一标识',
msg 文本 COMMENT '消息体,JSON格式化',
exchange VARCHAR(255) NOT NULL DEFAULT “ “ COMMENT '交换机',
routing_key VARCHAR(255) NOT NULL DEFAULT '' COMMENT '路由键',
status INT(11) NOT NULL DEFAULT '0' COMMENT '状态:0投递中1投递成功2投递失败3已消费',
try_count INT(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
next_try_time 日期时间 DEFAULT NULL COMMENT '下一次重试时间',
'create_time ' 日期 时间DEFAULT NULL COMMENT '创建时间',
'update_timedatetime DEFAULT NULL COMMENT '更新时间', 主 键 (msg_id), UNIQUE KEYunq_msg_idmsg_id`) USING BTREE
) ENGINE = InnoDB的 DEFAULT CHARSET = utf8mb4 COMMENT = '消息投递日志' ;

说明:exchange routing_key远程是在定时任务重新投递消息时需要用到的

5,MailUtil

@Component 
@ Slf4j
公共 类 MailUtil  {

    @Value(“ $ {spring.mail.from} ”)
    私有  String from;

    @Autowired
    私有  JavaMailSender mailSender;

    / **
     *发送简单邮件
     *
     *  @param  mail
     * /
    public  boolean send(Mail mail){
        字符串为= mail.getTo(); //目标邮箱
        字符串title = mail.getTitle(); //邮件标题
        字符串content = mail.getContent(); //邮件正文

        SimpleMailMessage message = new SimpleMailMessage();
        message.setFrom(from);
        message.setTo(to);
        message.setSubject(title);
        message.setText(content);

        尝试  {
            mailSender.send(message);
            log.info(“邮件发送成功”);
            返回 true ;
        }  catch  (MailException e){
            log.error(“邮件发送失败,至:{},标题:{}”,至,标题,e);
            返回 false ;
        }
    }
}

6,RabbitConfig

@Configuration 
@Slf 4j
公共 类 RabbitConfig  {

    @Autowired
    私有  CachingConnectionFactory connectionFactory;

    @Autowired
    私人  MsgLogService msgLogService;

    @Bean
    public  RabbitTemplate  rabbitTemplate ()  {
        RabbitTemplate rabbitTemplate =  new  RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(converter());

        // //消息是否成功发送到Exchange
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause)-> {
            if  (ack){
                log.info(“消息成功发送到Exchange”);
                字符串msgId = relatedData.getId();
                msgLogService.updateStatus(msgId,Constant.MsgLogStatus.DELIVER_SUCCESS);
            }  else  {
                log.info(“消息发送到Exchange失败,{},原因:{}”,correlationData,原因);
            }
        };

        // //触发setReturnCallback必须必须设置为true,否则Exchange没有找到队列就会放置掉消息,而不会触发
        rabbitTemplate.setMandatory(true);
        //消息是否从交流路由到排队机,注意:这是一个失败回调,只有消息从交换路由到队列失败才会回调这个方法
        rabbitTemplate.setReturnCallback((消息,replyCode,replyText,交换,routingKey) - > {
            日志。信息(“消息从Exchange路由到队列失败:exchange:{},route:{},replyCode:{},replyText:{},message:{}”,exchange,routingKey,replyCode,replyText,message);
        });

        返回  rabbitTemplate;
    }

    @Bean
    公共  Jackson2JsonMessageConverter  转换器()  {
        返回 新  Jackson2JsonMessageConverter();
    }

    //发送邮件
    public  static  final  String MAIL_QUEUE_NAME =  “ mail.queue” ;
    公共 静态 最终  字符串MAIL_EXCHANGE_NAME =  “ mail.exchange” ;
    公共 静态 决赛 字符串MAIL_ROUTING_KEY_NAME =  “ mail.routing.key” ;

    @Bean
    public  Queue  mailQueue ()  {
        return  new  Queue(MAIL_QUEUE_NAME,  true);
    }

    @Bean
    public  DirectExchange  mailExchange ()  {
        返回 新的  DirectExchange(MAIL_EXCHANGE_NAME,  true,  false);
    }

    @Bean
    public  Binding  mailBinding ()  {
        返回  BindingBuilder.bind(mailQueue())。to(mailExchange())。with(MAIL_ROUTING_KEY_NAME);
    }

}

7.TestServiceImpl生产消息

@Service 
公共 类 TestServiceImpl  实现 TestService  {

    @Autowired
    私有  MsgLogMapper msgLogMapper;

    @Autowired
    私有  RabbitTemplate RabbitTemplate;

    @Override
    public  ServerResponse  send (邮件)  {
        字符串msgId = RandomUtil.UUID32();
        mail.setMsgId(msgId);

        MsgLog msgLog =  新的  MsgLog(msgId,邮件,RabbitConfig.MAIL_EXCHANGE_NAME,RabbitConfig.MAIL_ROUTING_KEY_NAME);
        msgLogMapper.insert(msgLog); //消息入库

        CorrelationData relativeData =  new CorrelationData(msgId);
        rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME,RabbitConfig.MAIL_ROUTING_KEY_NAME,MessageHelper.objToMsg(mail),correlationData);//发送消息

        返回  ServerResponse.success(ResponseCode.MAIL_SEND_SUCCESS.getMsg());
    }

}

8.MailConsumer消费消息,发送邮件

@Component 
@Slf 4j
公共 类 MailConsumer  {

    @Autowired
    私有  MsgLogService msgLogService;

    @Autowired
    私人  MailUtil mailUtil;

    @RabbitListener(队列= RabbitConfig.MAIL_QUEUE_NAME)
    公共 无效 消耗(消息消息,通道通道) 抛出  IOException  {
        邮件= MessageHelper.msgToObj(message,Mail.class);
        log.info(“收到消息:{}”,mail.toString());

        字符串msgId = mail.getMsgId();

        MsgLog msgLog = msgLogService.selectByMsgId(msgId);
        如果  (null  == msgLog || msgLog.getStatus()。equals(Constant.MsgLogStatus.CONSUMED_SUCCESS)){ //消费幂等性
            log.info(“重复消费,msgId:{}”,msgId);
            回报 ;
        }

        MessageProperties属性= message.getMessageProperties();
        长  标签= properties.getDeliveryTag();

        布尔  成功= mailUtil.send(mail);
        如果  (成功){
            msgLogService.updateStatus(msgId,Constant.MsgLogStatus.CONSUMED_SUCCESS);
            channel.basicAck(tag,  false); //消费确认
        }  else  {
            channel.basicNack(tag,  false,  true);
        }
    }

}

说明:其实就完成了3件事:1.保证消费幂等性,2.发送邮件,3.更新消息状态,手动ack

9.ResendMsg定时任务重新投递发送失败的消息

@Component 
@Slf 4j
公共 类 ResendMsg  {

    @Autowired
    私有  MsgLogService msgLogService;

    @Autowired
    私有  RabbitTemplate RabbitTemplate;

    //最大投递次数
    private  static  final  int  MAX_TRY_COUNT =  3 ;

    / **
     *每30秒拉取投递失败的消息,重新投递
     * /
    @Scheduled(cron =  “ 0/30 * * * *?”)
    public  void  resend ()  {
        log.info(“开始执行定时任务(重新投递消息)“);

        清单

说明:每一条消息都和交换路由密钥绑定,所有消息重投共用这一个定时任务即可

五,基本测试

OK,目前为止,代码准备就绪,现在进行正常流程的测试

1.发送请求:

2.后台日志:

3.数据库消息记录:

状态为3,表明已消费,消息重试次数为0,表明一次投递就成功了

4.查看邮箱

发送成功

六,各种异常情况测试

步骤一罗列了很多关于RabbitMQ的知识点,很重要,很核心,而这里也涉及到了这些知识点的实现,接下来就通过异常测试进行验证(这些验证都是围绕此处开头的那张流程图展开的,很重要,所以,再贴一遍)

1.验证消息发送到Exchange失败情况下的对应,对应上图P-> X

如何验证?可以随便指定一个不存在的交换机名称,请求接口,看是否会触发切换

发送失败,原因:回复代码= 404,回复文本= NOT_FOUND-虚拟主机’/’中没有交换’mail.exchangeabcd’,该能够保证消息正确发送到Exchange,测试完成

2.验证消息从Exchange路由到队列失败情况下的相应,对应上图X-> Q

同理,修改一下路由键为不存在的即可,路由失败,触发重定向

发送失败,原因:route:mail.routing.keyabcd,replyCode:312,replyText:NO_ROUTE

3.验证在手动ack模式下,消费端必须进行手动确认(ack),否则消息会一直保存在其中中,直到被消费,对应上图Q-> C

将消费端代码channel.basicAck(tag,false); //消费确认注释掉,查看控制台和rabbitmq管控台

可以看到,虽然消息确实被消费了,但是由于是手动确认模式,而最后又没手动确认,所以,消息仍被rabbitmq保存,所以,手动ack能够保证消息一定被消费,但一定要记得basicAck

4.验证消费端幂等性

接下来上一步,去掉注释,重启服务器,由于有一条错误ack的消息,所以重启后监听到消息,进行消费,但是由于消费前会判断该消息的状态是否可以消费,发现status = 3,即已消费,所以,直接返回,这样就保证了消费端的幂等性,即使由于网络等原因投递成功而未触发转换,从而多次投递,也不会重复消费且发生业务异常

5.验证消费端发生异常消息也不会丢失

很明显,消费端代码可能发生异常,如果不做处理,业务没正确执行,消息却不见了,给我们感觉就是消息丢失了,由于我们消费端代码导致异常捕获,业务异常时,会触发: channel.basicNack(tag,false,true); 这样会告诉告诉rabbitmq该消息消费失败,需要重新入队,可以重新投递到其他正常的消费端进行消费,从而保证消息不被丢失

测试:send方法直接返回false即可(这里跟引发异常一个意思)

可以看到,由于channel.basicNack(tag,false,true),知道ack的消息(unacked)会重新入队并被消费,这样就保证了消息不会走丢

6.验证定时任务的消息重投

实际应用场景中,可能由于网络原因,或消息持续持久化MQ就停机机了,造成投递确认的替代方法ConfirmCallback没有被执行,从而导致数据库该消息状态一直是投递中的状态,此时就需要进行消息重投,甚至也许消息已经被消费了

定时任务只是保证消息100%投递成功,而多次投递的消费幂等性需要消费端自己保证

我们可以将至少和消费成功后更新消息状态的代码注释掉,开启定时任务,查看是否重投

可以看到,消息会重投3次,超过3次放弃,将消息状态放置为投递失败状态,出现这种非正常情况,就需要人工介入排查原因

七,拓展:使用动态代理实现消费端幂等性验证和消费确认(ack)

不知道大家发现没有,在MailConsumer中,真正的业务逻辑其实只是发送邮件为“已消费”状态,并手动确认,实际项目中,可能还有很多生产者-消费者的应用场景,如记录日志,发送短信等等,都需要rabbitmq,如果每次都写这些重复的专用代码,没必要,也难以维护,所以,我们可以将公共代码抽离出来,让核心业务逻辑只关心自己的实现,而不用做其他操作,其实就是AOP

为达到这个目的,有很多方法,可以用spring aop,可以用拦截器,可以用静态代理,也可以用动态代理,在这里,我用的是动态代理

目录结构如下:

核心代码就是代理的实现,此处就不把所有代码贴出来了,只是提供一个思路,我们要进行地把代码写的更简洁更优雅

八,总结

发送邮件其实很简单,但深究起来其实有很多需要注意和完善的点,一个看似很小的知识点,也可以引申出很多问题,甚至涉及到方方面面,这些都需要自己踩坑,当然我这代码肯定还有很多不完善和需要优化的点,希望小伙伴多多提意见和建议

我的代码都是经过自测验证过的,图也都是一点一点自己画的或认真截的,希望小伙伴能学到一点东西,路过的点个赞或点个关注呗,谢谢

RabbitMQ相关知识请参考

https://www.jianshu.com/p/cc3d2017e7b3

Linux安装RabbitMQ请参考

https://www.jianshu.com/p/ee9f7594212b

Windows安装RabbitMQ请参考

https://www.jianshu.com/p/c7726ba4b046

项目Github,欢迎叉

https://github.com/wangzaiplus/springboot/tree/wxw

来源:

www.jianshu.com/p/dca01aad6bc8

我还没有学会写个人说明!

最佳实践:金融级企业研发中自动和智能SQL质量控制

上一篇

从Gartner LAN魔力象限,看网络厂商的起起伏伏

下一篇

你也可能喜欢

如何通过SpringBoot + RabbitMQ保证消息100%投递成功并被消费?(附子系统) IT综合

长按储存图像,分享给朋友

ITPUB 每周精要将以邮件的形式发放至您的邮箱


微信扫一扫

微信扫一扫