一,先扔一张图

说明:
本文涵盖了关于RabbitMQ很多方面的知识点,例如:
- 消息发送确认机制
- 消费确认机制
- 消息的重新投递
- 消费幂等性,等等
这些都是围绕上面那张整体流程图展开的,所以有必要先贴出来,认为知意
二,实现思路
- 简略介绍163邮箱授权码的获取
- 编写发送邮件工具类
- 编写RabbitMQ配置文件
- 生产者发起呼吁
- 消费者发送邮件
- 定时任务定时拉取投递失败的消息,重新投递
- 各种异常情况的测试验证
扩展:使用动态代理实现消费端幂等性验证和消息确认(ack)
三,项目介绍
- springboot版本2.1.5.RELEASE,旧版本可能有些配置属性不能使用,需要以代码形式进行配置
- RabbitMQ版本3.7.15
- MailUtil:发送邮件工具类
- RabbitConfig:Rabbitmq相关配置
- TestServiceImpl:生产者,发送消息
- MailConsumer:消费者,消费消息,发送邮件
- ResendMsg:定时任务,重新投递发送失败的消息
说明:上面是核心代码,MsgLogService映射器xml等均未贴出,完整代码可以参考GitHub上的源码,地址在文末。
四,代码实现
1.163邮箱授权码的获取,如下:

该授权码就是配置文件spring.mail.password需要的密码
2.pom
<dependency>
<groupId>org.springframework.boot<groupId>
<artifactId>spring-boot-starter-amqp<artifactId>
<dependency>
<dependency>
<groupId>org.springframework.boot<groupId>
<artifactId>spring-boot-starter-mail<artifactId>
<dependency>
3.rabbitmq,邮箱配置
#RabbitMQ的
spring.rabbitmq.host =本地主机
spring.rabbitmq.port = 5672
spring.rabbitmq.username =来宾
spring.rabbitmq.password =来宾
#开启确认回调P - >交易所
spring.rabbitmq.publisher-确认= 真
#开启returnedMessage交换->队列spring.rabbitmq.publisher
-returns = true
#设置手动确认(ack)队列-> C
spring.rabbitmq.listener.simple.acknowledge-mode = manual
spring.rabbitmq.listener.simple.prefetch = 100
#邮件
spring.mail.host = smtp。163 .com
spring.mail.username = 18621142249 @ 163.com
spring.mail.password = 123456 wangzai
spring.mail.from = 18621142249 @ 163.com
spring.mail.properties.mail.smtp.auth = true
spring.mail.properties.mail.smtp.starttls.enable = true
spring.mail.properties.mail.smtp.starttls .required = true
说明:password即授权码,用户名和来自要一致
4.表结构
CREATE TABLEmsg_log
(
msg_id
VARCHAR(255) NOT NULL DEFAULT '' COMMENT '消息唯一标识',
msg
文本 COMMENT '消息体,JSON格式化',
exchange
VARCHAR(255) NOT NULL DEFAULT “ “ COMMENT '交换机',
routing_key
VARCHAR(255) NOT NULL DEFAULT '' COMMENT '路由键',
status
INT(11) NOT NULL DEFAULT '0' COMMENT '状态:0投递中1投递成功2投递失败3已消费',
try_count
INT(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
next_try_time
日期时间 DEFAULT NULL COMMENT '下一次重试时间',
'create_time ' 日期 时间DEFAULT NULL COMMENT '创建时间',
'update_timedatetime DEFAULT NULL COMMENT '更新时间', 主 键 (
msg_id), UNIQUE KEY
unq_msg_id(
msg_id`) USING BTREE
) ENGINE = InnoDB的 DEFAULT CHARSET = utf8mb4 COMMENT = '消息投递日志' ;
说明:exchange routing_key远程是在定时任务重新投递消息时需要用到的
5,MailUtil
@Component
@ Slf4j
公共 类 MailUtil {
@Value(“ $ {spring.mail.from} ”)
私有 String from;
@Autowired
私有 JavaMailSender mailSender;
/ **
*发送简单邮件
*
* @param mail
* /
public boolean send(Mail mail){
字符串为= mail.getTo(); //目标邮箱
字符串title = mail.getTitle(); //邮件标题
字符串content = mail.getContent(); //邮件正文
SimpleMailMessage message = new SimpleMailMessage();
message.setFrom(from);
message.setTo(to);
message.setSubject(title);
message.setText(content);
尝试 {
mailSender.send(message);
log.info(“邮件发送成功”);
返回 true ;
} catch (MailException e){
log.error(“邮件发送失败,至:{},标题:{}”,至,标题,e);
返回 false ;
}
}
}
6,RabbitConfig
@Configuration
@Slf 4j
公共 类 RabbitConfig {
@Autowired
私有 CachingConnectionFactory connectionFactory;
@Autowired
私人 MsgLogService msgLogService;
@Bean
public RabbitTemplate rabbitTemplate () {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(converter());
// //消息是否成功发送到Exchange
rabbitTemplate.setConfirmCallback((correlationData,ack,cause)-> {
if (ack){
log.info(“消息成功发送到Exchange”);
字符串msgId = relatedData.getId();
msgLogService.updateStatus(msgId,Constant.MsgLogStatus.DELIVER_SUCCESS);
} else {
log.info(“消息发送到Exchange失败,{},原因:{}”,correlationData,原因);
}
};
// //触发setReturnCallback必须必须设置为true,否则Exchange没有找到队列就会放置掉消息,而不会触发
rabbitTemplate.setMandatory(true);
//消息是否从交流路由到排队机,注意:这是一个失败回调,只有消息从交换路由到队列失败才会回调这个方法
rabbitTemplate.setReturnCallback((消息,replyCode,replyText,交换,routingKey) - > {
日志。信息(“消息从Exchange路由到队列失败:exchange:{},route:{},replyCode:{},replyText:{},message:{}”,exchange,routingKey,replyCode,replyText,message);
});
返回 rabbitTemplate;
}
@Bean
公共 Jackson2JsonMessageConverter 转换器() {
返回 新 Jackson2JsonMessageConverter();
}
//发送邮件
public static final String MAIL_QUEUE_NAME = “ mail.queue” ;
公共 静态 最终 字符串MAIL_EXCHANGE_NAME = “ mail.exchange” ;
公共 静态 决赛 字符串MAIL_ROUTING_KEY_NAME = “ mail.routing.key” ;
@Bean
public Queue mailQueue () {
return new Queue(MAIL_QUEUE_NAME, true);
}
@Bean
public DirectExchange mailExchange () {
返回 新的 DirectExchange(MAIL_EXCHANGE_NAME, true, false);
}
@Bean
public Binding mailBinding () {
返回 BindingBuilder.bind(mailQueue())。to(mailExchange())。with(MAIL_ROUTING_KEY_NAME);
}
}
7.TestServiceImpl生产消息
@Service
公共 类 TestServiceImpl 实现 TestService {
@Autowired
私有 MsgLogMapper msgLogMapper;
@Autowired
私有 RabbitTemplate RabbitTemplate;
@Override
public ServerResponse send (邮件) {
字符串msgId = RandomUtil.UUID32();
mail.setMsgId(msgId);
MsgLog msgLog = 新的 MsgLog(msgId,邮件,RabbitConfig.MAIL_EXCHANGE_NAME,RabbitConfig.MAIL_ROUTING_KEY_NAME);
msgLogMapper.insert(msgLog); //消息入库
CorrelationData relativeData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME,RabbitConfig.MAIL_ROUTING_KEY_NAME,MessageHelper.objToMsg(mail),correlationData);//发送消息
返回 ServerResponse.success(ResponseCode.MAIL_SEND_SUCCESS.getMsg());
}
}
8.MailConsumer消费消息,发送邮件
@Component
@Slf 4j
公共 类 MailConsumer {
@Autowired
私有 MsgLogService msgLogService;
@Autowired
私人 MailUtil mailUtil;
@RabbitListener(队列= RabbitConfig.MAIL_QUEUE_NAME)
公共 无效 消耗(消息消息,通道通道) 抛出 IOException {
邮件= MessageHelper.msgToObj(message,Mail.class);
log.info(“收到消息:{}”,mail.toString());
字符串msgId = mail.getMsgId();
MsgLog msgLog = msgLogService.selectByMsgId(msgId);
如果 (null == msgLog || msgLog.getStatus()。equals(Constant.MsgLogStatus.CONSUMED_SUCCESS)){ //消费幂等性
log.info(“重复消费,msgId:{}”,msgId);
回报 ;
}
MessageProperties属性= message.getMessageProperties();
长 标签= properties.getDeliveryTag();
布尔 成功= mailUtil.send(mail);
如果 (成功){
msgLogService.updateStatus(msgId,Constant.MsgLogStatus.CONSUMED_SUCCESS);
channel.basicAck(tag, false); //消费确认
} else {
channel.basicNack(tag, false, true);
}
}
}
说明:其实就完成了3件事:1.保证消费幂等性,2.发送邮件,3.更新消息状态,手动ack
9.ResendMsg定时任务重新投递发送失败的消息
@Component
@Slf 4j
公共 类 ResendMsg {
@Autowired
私有 MsgLogService msgLogService;
@Autowired
私有 RabbitTemplate RabbitTemplate;
//最大投递次数
private static final int MAX_TRY_COUNT = 3 ;
/ **
*每30秒拉取投递失败的消息,重新投递
* /
@Scheduled(cron = “ 0/30 * * * *?”)
public void resend () {
log.info(“开始执行定时任务(重新投递消息)“);
清单
说明:每一条消息都和交换路由密钥绑定,所有消息重投共用这一个定时任务即可
五,基本测试
OK,目前为止,代码准备就绪,现在进行正常流程的测试
1.发送请求:

2.后台日志:

3.数据库消息记录:

状态为3,表明已消费,消息重试次数为0,表明一次投递就成功了
4.查看邮箱

发送成功
六,各种异常情况测试
步骤一罗列了很多关于RabbitMQ的知识点,很重要,很核心,而这里也涉及到了这些知识点的实现,接下来就通过异常测试进行验证(这些验证都是围绕此处开头的那张流程图展开的,很重要,所以,再贴一遍)

1.验证消息发送到Exchange失败情况下的对应,对应上图P-> X
如何验证?可以随便指定一个不存在的交换机名称,请求接口,看是否会触发切换

发送失败,原因:回复代码= 404,回复文本= NOT_FOUND-虚拟主机’/’中没有交换’mail.exchangeabcd’,该能够保证消息正确发送到Exchange,测试完成
2.验证消息从Exchange路由到队列失败情况下的相应,对应上图X-> Q
同理,修改一下路由键为不存在的即可,路由失败,触发重定向

发送失败,原因:route:mail.routing.keyabcd,replyCode:312,replyText:NO_ROUTE
3.验证在手动ack模式下,消费端必须进行手动确认(ack),否则消息会一直保存在其中中,直到被消费,对应上图Q-> C
将消费端代码channel.basicAck(tag,false); //消费确认注释掉,查看控制台和rabbitmq管控台


可以看到,虽然消息确实被消费了,但是由于是手动确认模式,而最后又没手动确认,所以,消息仍被rabbitmq保存,所以,手动ack能够保证消息一定被消费,但一定要记得basicAck
4.验证消费端幂等性
接下来上一步,去掉注释,重启服务器,由于有一条错误ack的消息,所以重启后监听到消息,进行消费,但是由于消费前会判断该消息的状态是否可以消费,发现status = 3,即已消费,所以,直接返回,这样就保证了消费端的幂等性,即使由于网络等原因投递成功而未触发转换,从而多次投递,也不会重复消费且发生业务异常

5.验证消费端发生异常消息也不会丢失
很明显,消费端代码可能发生异常,如果不做处理,业务没正确执行,消息却不见了,给我们感觉就是消息丢失了,由于我们消费端代码导致异常捕获,业务异常时,会触发: channel.basicNack(tag,false,true); 这样会告诉告诉rabbitmq该消息消费失败,需要重新入队,可以重新投递到其他正常的消费端进行消费,从而保证消息不被丢失
测试:send方法直接返回false即可(这里跟引发异常一个意思)

可以看到,由于channel.basicNack(tag,false,true),知道ack的消息(unacked)会重新入队并被消费,这样就保证了消息不会走丢
6.验证定时任务的消息重投
实际应用场景中,可能由于网络原因,或消息持续持久化MQ就停机机了,造成投递确认的替代方法ConfirmCallback没有被执行,从而导致数据库该消息状态一直是投递中的状态,此时就需要进行消息重投,甚至也许消息已经被消费了
定时任务只是保证消息100%投递成功,而多次投递的消费幂等性需要消费端自己保证
我们可以将至少和消费成功后更新消息状态的代码注释掉,开启定时任务,查看是否重投


可以看到,消息会重投3次,超过3次放弃,将消息状态放置为投递失败状态,出现这种非正常情况,就需要人工介入排查原因
七,拓展:使用动态代理实现消费端幂等性验证和消费确认(ack)
不知道大家发现没有,在MailConsumer中,真正的业务逻辑其实只是发送邮件为“已消费”状态,并手动确认,实际项目中,可能还有很多生产者-消费者的应用场景,如记录日志,发送短信等等,都需要rabbitmq,如果每次都写这些重复的专用代码,没必要,也难以维护,所以,我们可以将公共代码抽离出来,让核心业务逻辑只关心自己的实现,而不用做其他操作,其实就是AOP
为达到这个目的,有很多方法,可以用spring aop,可以用拦截器,可以用静态代理,也可以用动态代理,在这里,我用的是动态代理
目录结构如下:

核心代码就是代理的实现,此处就不把所有代码贴出来了,只是提供一个思路,我们要进行地把代码写的更简洁更优雅
八,总结
发送邮件其实很简单,但深究起来其实有很多需要注意和完善的点,一个看似很小的知识点,也可以引申出很多问题,甚至涉及到方方面面,这些都需要自己踩坑,当然我这代码肯定还有很多不完善和需要优化的点,希望小伙伴多多提意见和建议
我的代码都是经过自测验证过的,图也都是一点一点自己画的或认真截的,希望小伙伴能学到一点东西,路过的点个赞或点个关注呗,谢谢
RabbitMQ相关知识请参考
https://www.jianshu.com/p/cc3d2017e7b3
Linux安装RabbitMQ请参考
https://www.jianshu.com/p/ee9f7594212b
Windows安装RabbitMQ请参考
https://www.jianshu.com/p/c7726ba4b046
项目Github,欢迎叉
https://github.com/wangzaiplus/springboot/tree/wxw
来源:
www.jianshu.com/p/dca01aad6bc8