11 SpringBoot整合RocketMQ实现事务消息

java1234_小锋 2021-09-15 09:15:26
SpringBoot 事务 RocketMQ 实现 整合


事务消息是RocketMQ提供的非常重要的一个特性,在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务。

RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:

Half(Prepare) Message——半消息(预处理消息)

半消息是一种特殊的消息类型,该状态的消息暂时不能被Consumer消费。当一条事务消息被成功投递到Broker上,但是Broker并没有接收到Producer发出的二次确认时,该事务消息就处于"暂时不可被消费"状态,该状态的事务消息被称为半消息。

Message Status Check——消息状态回查

由于网络抖动、Producer重启等原因,可能导致Producer向Broker发送的二次确认消息没有成功送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态(Commit 或 Rollback)。可以看出,Message Status Check主要用来解决分布式事务中的超时问题。

执行流程:
(C:\Users\java1234\Desktop\RocketMQ\课件\RocketMQ分布式消息队列课件.assets\image-20210829172408953.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DR4JAeSd-1631416355206)(C:\Users\java1234\Desktop\RocketMQ\课件\RocketMQ分布式消息队列课件.assets\image-20210830182140491.png)]

  1. 应用模块遇到要发送事务消息的场景时,先发送prepare消息给MQ。
  2. prepare消息发送成功后,应用模块执行数据库事务(本地事务)。
  3. 根据数据库事务执行的结果,再返回Commit或Rollback给MQ。
  4. 如果是Commit,MQ把消息下发给Consumer端,如果是Rollback,直接删掉prepare消息。
  5. 第3步的执行结果如果没响应,或是超时的,启动定时任务回查事务状态(最多重试15次,超过了默认丢弃此消息),处理结果同第4步。
  6. MQ消费的成功机制由MQ自己保证。

具体实例:

通过rocketMQTemplatesendMessageInTransaction方法发送事务消息

/** * 发送事务消息 */
public void sendTransactionMessage(){

// 构造消息
Message msg = MessageBuilder.withPayload("rocketmq事务消息-01").build();
rocketMQTemplate.sendMessageInTransaction("java1234-transaction-rocketmq",msg,null);
}

定义本地事务处理类,实现RocketMQLocalTransactionListener接口,以及加上@RocketMQTransactionListener注解,这个类似方法的调用是异步的;

executeLocalTransaction方法,当我们处理完业务后,可以根据业务处理情况,返回事务执行状态,有bollback, commit or unknown三种,分别是回滚事务,提交事务和未知;根据事务消息执行流程,如果返回bollback,则直接丢弃消息;如果是返回commit,则消费消息;如果是unknow,则继续等待,然后调用checkLocalTransaction方法,最多重试15次,超过了默认丢弃此消息;

checkLocalTransaction方法,是当MQ Server未得到MQ发送方应答,或者超时的情况,或者应答是unknown的情况,调用此方法进行检查确认,返回值和上面的方法一样;

@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {

@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {

// ... local transaction process, return bollback, commit or unknown
System.out.println("executeLocalTransaction");
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {

// ... check transaction status and return bollback, commit or unknown
System.out.println("checkLocalTransaction");
return RocketMQLocalTransactionState.COMMIT;
}
}

运行:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8ZwUmiyX-1631416355207)(C:\Users\java1234\Desktop\RocketMQ\课件\RocketMQ分布式消息队列课件.assets\image-20210830234654676.png)]

生产者端两个方法都执行到了,

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XvyLC4uq-1631416355208)(C:\Users\java1234\Desktop\RocketMQ\课件\RocketMQ分布式消息队列课件.assets\image-20210830234719938.png)]

消费端也获取到了消息;

执行如下:

生产者端发送half消息到MQ-SERVER,然后异步执行executeLocalTransaction方法,返回unknown,MQ-SERVER接收到unknown后,继续等待,然后再执行checkLocalTransaction确认,返回commit,MQ-SERVER得到commit后,消费端才可以消费消息;

说明:这个是锋哥的RocketMQ备课笔记,等备课完,会发布配套的视频教程,如有需要,可以先加锋哥WX:java1239 欢迎白嫖
没问题!

微信搜一搜公众号【java1234】关注这个放荡不羁的程序员,关注后回复【资料】有我准备的一线大厂笔试面试资料以及简历模板。

版权声明
本文为[java1234_小锋]所创,转载请带上原文链接,感谢
https://blog.csdn.net/caoli201314/article/details/120248361

  1. 快速从 Windows 切换到 Linux 环境
  2. 五分钟向MySql数据库插入一千万条数据
  3. Java日期时间API系列42-----一种高效的中文日期格式化和解析方法
  4. 用Java实现红黑树
  5. 使用Redis Stream来做消息队列和在Asp.Net Core中的实现
  6. 海量列式非关系数据库HBase 架构,shell与API
  7. Architecture, Shell et API de base de données non relationnelle à grande échelle
  8. Mise en œuvre de l'arbre Rouge et noir en Java
  9. Java Date Time API Series 42 - - a efficient Chinese Date Format and Analysis Method
  10. 5 minutes pour insérer 10 millions de données dans la base de données MySQL
  11. Passage rapide de Windows à l'environnement Linux
  12. Notes on Java backend development of PostgreSQL (I)
  13. 海量列式非關系數據庫HBase 架構,shell與API
  14. Byte Jump the latest open source, the most Classic hashtap Graph details,
  15. L'interview Java de Byte Hopping Society, l'analyse super populaire de l'utilisation et du code source de countdownlatch,
  16. "Anti Mafia storm" Wang Zhifei's love history is really wonderful: he divorced Zhang Xinyi and married a 14-year-old wife
  17. In spring in the jade mansion, Jia Fengyuan was not moved by his brother's death. Why was su Yingxue changed? The reason is realistic
  18. Adam Oracle Oracle fully constructs Adam token incentive for ecological development
  19. 实战SpringCloud通用请求字段拦截处理,超过500人面试阿里,
  20. 宅家36天咸鱼翻身入职腾讯,Zookeeper一致性级别分析,
  21. The first starcoin & move hacksong source code analysis - P (a)
  22. Zhaijia 36 days Salt Fish turn into Tencent, Zookeeper Consistency level analysis,
  23. Traitement de l'interception des champs de demande communs de Spring Cloud, plus de 500 personnes interviewent Ali,
  24. About JavaScript modules
  25. Object oriented programming (2)
  26. Java日期时间API系列42-----一种高效的中文日期格式化和解析方法
  27. Java日期時間API系列42-----一種高效的中文日期格式化和解析方法
  28. 宅家36天鹹魚翻身入職騰訊,Zookeeper一致性級別分析,
  29. Java Date Time API Series 42 - - a efficient Chinese Date Format and Analysis Method
  30. 已成功拿下字节、腾讯、脉脉offer,7年老Java一次操蛋的面试经历,
  31. 小米Java社招面试,每次面试必问的二叉树的设计与编码,
  32. 小米Java校招面试,阿里、百度、美团、携程、蚂蚁面经分享,
  33. 小米Java校招面試,阿裏、百度、美團、攜程、螞蟻面經分享,
  34. Xiaomi Java School Recruitment interview, Ali, baidu, meituan, ctrip, ant Facebook Sharing,
  35. La conception et le codage de l'arbre binaire requis pour chaque entrevue d'embauche de la société Java millet;
  36. A remporté avec succès Byte, Tencent, Pulse offer, 7 ans Java une expérience d'entrevue de baise,
  37. 干货来袭,Java岗面试12家大厂成功跳槽,
  38. 常用Java框架面试题目,现在做Java开发有前途吗?
  39. 常用Java框架面試題目,現在做Java開發有前途嗎?
  40. Les questions d'entrevue couramment utilisées pour le cadre Java sont - elles prometteuses pour le développement Java?
  41. L'arrivée de marchandises sèches, l'entretien d'emploi Java 12 grandes usines ont réussi à changer d'emploi,
  42. Multiple postures for handling container time in k8s environment
  43. Echarts remove left Gap, Blank
  44. Hotspot Weekly | zoom $100 million, docker fees, $38 billion Data bricks
  45. JsonMappingException: No serializer found for class org.apache.ibatis.executor.loader.javassist.JavassistProxyFactory...
  46. Java. Security. Securerandom source code analysis Java. Security. EGD = file: / dev /. / urandom
  47. When using IntelliJ idea, jump directly and quickly from the mapper interface to mapper.xml
  48. When idea writes SQL in mybatis XML, the solution to the problems of table name, field and red reporting
  49. Spring cloud integrates Nacos
  50. 应届毕业生Java笔试题目,2021大厂Java社招最全面试题,
  51. Liver explosion! Take you to understand Hadoop serialization
  52. linux系列之:告诉他,他根本不懂kill
  53. java版gRPC实战之三:服务端流
  54. RabbitMQ核心知识总结!
  55. linux系列之:告诉他,他根本不懂kill
  56. java版gRPC实战之三:服务端流
  57. RabbitMQ核心知识总结!
  58. 10天拿到字节跳动Java岗位offer,学习Java开发的步骤
  59. 10天拿到字节跳动Java岗位offer,Java知识点思维导图
  60. Résumé des connaissances de base de rabbitmq!