导航:首页 > 源码编译 > rocketmq事务源码详解

rocketmq事务源码详解

发布时间:2023-07-04 01:01:13

❶ RocketMQ的事务消息

RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账,A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银行账户扣除一万元”这个操作同时成功或者同时失败。
RocketMQ采用两阶段提交的方式实现事务消息,TransactionMQProcer处理上面情况的流程是,先发一个“准备从B银行账户增加一万元”的消息,发送成功后做从A银行账户扣除一万元的操作,根据操作结果是否成功,确定之前的“准备从B银行账户增加一万元”的消息是做commit还是rollback,具体流程如下:
1)发送方向RocketMQ发送“待确认”消息。
2)RocketMQ将收到的“待确认”消息持久化成功后,向发送方回复消息已经发送成功,此时第一阶段消息发送完成。
3)发送方开始执行本地事件逻辑。
4)发送方根据本地事件执行结果向RocketMQ发送二次确认(Commit或是Rollback)消息,RocketMQ收到Commit状态则将第一阶段消息标记为可投递,订阅方将能够收到该消息;收到Rollback状态则删除第一阶段的消息,订阅方接收不到该消息。
5)如果出现异常情况,步骤4)提交的二次确认最终未到达RocketMQ,服务器在经过固定时间段后将对“待确认”消息发起回查请求。
6)发送方收到消息回查请求后(如果发送一阶段消息的Procer不能工作,回查请求将被发送到和Procer在同一个Group里的其他Procer),通过检查对应消息的本地事件执行结果返回Commit或Roolback状态。
7)RocketMQ收到回查请求后,按照步骤4)的逻辑处理。

上面的逻辑似乎很好地实现了事务消息功能,它也是RocketMQ之前的版本实现事务消息的逻辑。
但是因为RocketMQ依赖将数据顺序写到磁盘这个特征来提高性能,步骤4)却需要更改第一阶段消息的状态,这样会造成磁盘Catch的脏页过多,降低系统的性能。所以RocketMQ在4.x的版本中将这部分功能去除。系统中的一些上层Class都还在,用户可以根据实际需求实现自己的事务功能。
客户端有三个类来支持用户实现事务消息,
第一个类是LocalTransaction-Executer,用来实例化步骤3)的逻辑,根据情况返回LocalTransactionState.ROLLBACK_MESSAGE或者
LocalTransactionState.COMMIT_MESSAGE状态。
第二个类是TransactionMQProcer,它的用法和DefaultMQProcer类似,要通过它启动一个Procer并发消息,但是比DefaultMQProcer多设置本地事务处理函数和回查状态函数。
第三个类是TransactionCheckListener,实现步骤5)中MQ服务器的回查请求,返回LocalTransactionState.ROLLBACK_MESSAGE或者LocalTransactionState.COMMIT_MESSAGE

上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
1.事务消息发送及提交:
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)。
2.补偿流程:
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”。
(2) Procer收到回查消息,检查回查消息对应的本地事务的状态。
(3) 根据本地事务状态,重新Commit或者Rollback。
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

在RocketMQ事务消息的主要流程中,一阶段的消息如何对用户不可见。其中,事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。那么,如何做到写入消息但是对用户不可见呢?RocketMQ事务消息的做法是:如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息。然后二阶段会显示执行提交或者回滚half消息(逻辑删除)。当然,为了防止二阶段操作失败,RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。
在RocketMQ中,消息在服务端的存储结构如下,每条消息都会有对应的索引信息,Consumer通 过ConsumeQueue这个二级索引来读取消息实体内容,其流程如下:

RocketMQ的具体实现策略是:写入的如果事务消息,对消息的Topic和Queue等属性进行替换,同时将原来的Topic和Queue信息存储到消息的属性中,正因为消息主题被替换,故消息并不会转发到该原主题的消息消费队列,消费者无法感知消息的存在,不会消费。其实改变消息主题是RocketMQ的常用“套路”,回想一下延时消息的实现机制。RMQ_SYS_TRANS_HALF_TOPIC

在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。先说Rollback的情况。对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。但是区别于这条消息没有确定状态(Pending状态,事务悬而未决),需要一个操作来标识这条消息的最终状态。RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。

RocketMQ将Op消息写入到全局一个特定的Topic中通过源码中的方法—
TransactionalMessageUtil.buildOpTopic();这个Topic是一个内部的Topic(像Half消息的Topic一样),不会被用户消费。Op消息的内容为对应的Half消息的存储的Offset,这样通过Op消息能索引到Half消息进行后续的回查操作。

在执行二阶段Commit操作时,需要构建出Half消息的索引。一阶段的Half消息由于是写到一个特殊的Topic,所以二阶段构建索引时需要读取出Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后通过一次普通消息的写入操作来生成一条对用户可见的消息。所以RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息写入流程。

如果在RocketMQ事务消息的二阶段过程中失败了,例如在做Commit操作时,出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。RocketMQ采用了一种补偿机制,称为“回查”。Broker端对未确定状态的消息发起回查,将消息发送到对应的Procer端(同一个Group的Procer),由Procer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。
Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint(记录那些事务消息的状态是确定的)。
值得注意的是,rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息。

TxConsumer类实现

❷ rocketmq 发送失败一般怎么处理

一:RocketMQ简介
RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:

1.能够保证严格的消息顺序

2.提供丰富的消息拉取模式

3.高效的订阅者水平扩展能力

4.实时的消息订阅机制

5.亿级消息堆积能力
二:安装RocketMQ
下载源码
首先我们从githup上获取RocketMQ的源码,目前最新的版本为3.5.8,下载地址为: 或者 wget /alibaba/RocketMQ/archive/v3.5.8.tar.gz。请注意:此时我们下载的是源码,直接解压时不能用的,所以我们需要编译之后才能使用。
编译源码
在进行编译源码之前我们需要安装JDK。如果你已经安装过了,请跳过这里。如果你还没有安装过JDK,请参考这篇文章(Linux环境下安装JDK)。然后我们还需要安装一下Maven。Maven的安装还是比较简单,只需要去官方上下载的安装吧,然后直接解压,再配置一下环境变量就OK。接下来我们把刚才下载来的RockeMQ的源码解压到/usr/local/rockemq-source文件夹中。在源码中有一个Install.sh。如图所示:
。运行sh install.sh。在编译完成之后,我们只要target目录下的alibaba-rocketmq这个文件夹中内容,把alibaba-rocketmq文件夹中的内容移动到/usr/local/rocketmq中。如果你不想编译的话,可以从这里下载编译之后的rocketmq。(rocketmq3.5.8)。
配置环境变量
接下来我们需要配置一下环境变量。在终端中输入以下命令:vi /etc/profile ,在文件的末尾中添加如下两句话:export rocketmq=/usr/local/rocketmq export PATH=$PATH:$rocketmq/bin。接下来我们使配置的换将变量生效:source /etc/profile.
三:启动RocketMQ
接下来我们启动一下刚才编译的RocketMQ.在启动之前我们需要修改一下RocketMQ启动的内存大小(如果你的系统内存比较大的话,请忽略)。我们进入到/usr/local/rocketmq/bin中,在终端中输入以下命令修改mqnamesrv的内存大小:vi runserver.sh.修改为如图的内容:
,接下来修改broker的内存大小:vi runbroker.sh:

启动mqnameserver
进入到/usr/local/rocketmq/bin中输入以下命令:nohup sh mqnamesrv > ~/logs/rocketmqlogs/namesrv.log 2>&1 &。注意最后的这个 & 不要少。
启动mqbroker
进入到/usr/local/rocketmq/bin中输入以下命令:nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true > ~/logs/rocketmqlogs/broker.log 2>&1 &。注意:localhost可以换成你刚才启动mqnamesrv的IP。autoCreateTopicEnable=true
这句话不要少了。最后的 & 也不要少了。
我们可以通过 ps aux | grep java命令来查看启动的情况。

到此,rocketmq的安装完毕。
四:RocketMQ的小例子
procer:

[java] view plain
package com.zkn.newlearn.rocketmq;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.procer.DefaultMQProcer;
import com.alibaba.rocketmq.client.procer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

import java.util.concurrent.TimeUnit;

/**
* Created by zkn on 2016/10/27.
*/
public class ProcerTest01 {

public static void main(String[] args) {

/**
* 一个应用创建一个Procer,由应用来维护此对象,可以设置为全局对象或者单例<br>
* 注意:ProcerGroupName需要由应用来保证唯一<br>
* ProcerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
* 因为服务器会回查这个Group下的任意一个Procer
*/
DefaultMQProcer procer = new DefaultMQProcer("ProcerGroupName");
//procer.setNamesrvAddr("192.168.180.1:9876");
procer.setNamesrvAddr("192.168.180.133:9876");
procer.setInstanceName("Procer");
/**
* Procer对象在使用之前必须要调用start初始化,初始化一次即可<br>
* 注意:切记不可以在每次发送消息时,都调用start方法
*/
try {
procer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
for (int i = 0; i < 100; i++) {
try {
/**
* 下面这段代码表明一个Procer对象可以发送多个topic,多个tag的消息。
* 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>
* 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>
* 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
*/
{
Message msg = new Message("TopicTest1",// topic
"TagA",// tag
"OrderID001",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = procer.send(msg);
System.out.println(sendResult);
}

{
Message msg = new Message("TopicTest2",
"TagB",
"OrderID001",
("Hello MetaQ TagB".getBytes()));

SendResult sendResult = procer.send(msg);
System.out.println(sendResult);
}

{
Message msg = new Message("TopicTest3",
"TagC",
"OrderID001",
("Hello MetaQ TagC").getBytes());

SendResult sendResult = procer.send(msg);

System.out.println(sendResult);
}

TimeUnit.MILLISECONDS.sleep(1000);

} catch (MQClientException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
}
}
/**
* 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
* 注意:我们建议应用在JBOSS、Tomcat等容器的退出销毁方法里调用shutdown方法
*/
procer.shutdown();
}
}

阅读全文

与rocketmq事务源码详解相关的资料

热点内容
dvd光盘存储汉子算法 浏览:757
苹果邮件无法连接服务器地址 浏览:962
phpffmpeg转码 浏览:671
长沙好玩的解压项目 浏览:144
专属学情分析报告是什么app 浏览:564
php工程部署 浏览:833
android全屏透明 浏览:737
阿里云服务器已开通怎么办 浏览:803
光遇为什么登录时服务器已满 浏览:302
PDF分析 浏览:484
h3c光纤全工半全工设置命令 浏览:143
公司法pdf下载 浏览:381
linuxmarkdown 浏览:350
华为手机怎么多选文件夹 浏览:683
如何取消命令方块指令 浏览:349
风翼app为什么进不去了 浏览:778
im4java压缩图片 浏览:362
数据查询网站源码 浏览:150
伊克塞尔文档怎么进行加密 浏览:892
app转账是什么 浏览:163