導航:首頁 > 源碼編譯 > rocketmq事務源碼詳解

rocketmq事務源碼詳解

發布時間:2023-07-04 01:01:13

❶ RocketMQ的事務消息

RocketMQ的事務消息,是指發送消息事件和其他事件需要同時成功或同時失敗。比如銀行轉賬,A銀行的某賬戶要轉一萬元到B銀行的某賬戶。A銀行發送「B銀行賬戶增加一萬元」這個消息,要和「從A銀行賬戶扣除一萬元」這個操作同時成功或者同時失敗。
RocketMQ採用兩階段提交的方式實現事務消息,TransactionMQProcer處理上面情況的流程是,先發一個「准備從B銀行賬戶增加一萬元」的消息,發送成功後做從A銀行賬戶扣除一萬元的操作,根據操作結果是否成功,確定之前的「准備從B銀行賬戶增加一萬元」的消息是做commit還是rollback,具體流程如下:
1)發送方向RocketMQ發送「待確認」消息。
2)RocketMQ將收到的「待確認」消息持久化成功後,向發送方回復消息已經發送成功,此時第一階段消息發送完成。
3)發送方開始執行本地事件邏輯。
4)發送方根據本地事件執行結果向RocketMQ發送二次確認(Commit或是Rollback)消息,RocketMQ收到Commit狀態則將第一階段消息標記為可投遞,訂閱方將能夠收到該消息;收到Rollback狀態則刪除第一階段的消息,訂閱方接收不到該消息。
5)如果出現異常情況,步驟4)提交的二次確認最終未到達RocketMQ,伺服器在經過固定時間段後將對「待確認」消息發起回查請求。
6)發送方收到消息回查請求後(如果發送一階段消息的Procer不能工作,回查請求將被發送到和Procer在同一個Group里的其他Procer),通過檢查對應消息的本地事件執行結果返回Commit或Roolback狀態。
7)RocketMQ收到回查請求後,按照步驟4)的邏輯處理。

上面的邏輯似乎很好地實現了事務消息功能,它也是RocketMQ之前的版本實現事務消息的邏輯。
但是因為RocketMQ依賴將數據順序寫到磁碟這個特徵來提高性能,步驟4)卻需要更改第一階段消息的狀態,這樣會造成磁碟Catch的臟頁過多,降低系統的性能。所以RocketMQ在4.x的版本中將這部分功能去除。系統中的一些上層Class都還在,用戶可以根據實際需求實現自己的事務功能。
客戶端有三個類來支持用戶實現事務消息,
第一個類是LocalTransaction-Executer,用來實例化步驟3)的邏輯,根據情況返回LocalTransactionState.ROLLBACK_MESSAGE或者
LocalTransactionState.COMMIT_MESSAGE狀態。
第二個類是TransactionMQProcer,它的用法和DefaultMQProcer類似,要通過它啟動一個Procer並發消息,但是比DefaultMQProcer多設置本地事務處理函數和回查狀態函數。
第三個類是TransactionCheckListener,實現步驟5)中MQ伺服器的回查請求,返回LocalTransactionState.ROLLBACK_MESSAGE或者LocalTransactionState.COMMIT_MESSAGE

上圖說明了事務消息的大致方案,其中分為兩個流程:正常事務消息的發送及提交、事務消息的補償流程。
1.事務消息發送及提交:
(1) 發送消息(half消息)。
(2) 服務端響應消息寫入結果。
(3) 根據發送結果執行本地事務(如果寫入失敗,此時half消息對業務不可見,本地邏輯不執行)。
(4) 根據本地事務狀態執行Commit或者Rollback(Commit操作生成消息索引,消息對消費者可見)。
2.補償流程:
(1) 對沒有Commit/Rollback的事務消息(pending狀態的消息),從服務端發起一次「回查」。
(2) Procer收到回查消息,檢查回查消息對應的本地事務的狀態。
(3) 根據本地事務狀態,重新Commit或者Rollback。
其中,補償階段用於解決消息Commit或者Rollback發生超時或者失敗的情況。

在RocketMQ事務消息的主要流程中,一階段的消息如何對用戶不可見。其中,事務消息相對普通消息最大的特點就是一階段發送的消息對用戶是不可見的。那麼,如何做到寫入消息但是對用戶不可見呢?RocketMQ事務消息的做法是:如果消息是half消息,將備份原消息的主題與消息消費隊列,然後改變主題為RMQ_SYS_TRANS_HALF_TOPIC。由於消費組未訂閱該主題,故消費端無法消費half類型的消息。然後二階段會顯示執行提交或者回滾half消息(邏輯刪除)。當然,為了防止二階段操作失敗,RocketMQ會開啟一個定時任務,從Topic為RMQ_SYS_TRANS_HALF_TOPIC中拉取消息進行消費,根據生產者組獲取一個服務提供者發送回查事務狀態請求,根據事務狀態來決定是提交或回滾消息。
在RocketMQ中,消息在服務端的存儲結構如下,每條消息都會有對應的索引信息,Consumer通 過ConsumeQueue這個二級索引來讀取消息實體內容,其流程如下:

RocketMQ的具體實現策略是:寫入的如果事務消息,對消息的Topic和Queue等屬性進行替換,同時將原來的Topic和Queue信息存儲到消息的屬性中,正因為消息主題被替換,故消息並不會轉發到該原主題的消息消費隊列,消費者無法感知消息的存在,不會消費。其實改變消息主題是RocketMQ的常用「套路」,回想一下延時消息的實現機制。RMQ_SYS_TRANS_HALF_TOPIC

在完成一階段寫入一條對用戶不可見的消息後,二階段如果是Commit操作,則需要讓消息對用戶可見;如果是Rollback則需要撤銷一階段的消息。先說Rollback的情況。對於Rollback,本身一階段的消息對用戶是不可見的,其實不需要真正撤銷消息(實際上RocketMQ也無法去真正的刪除一條消息,因為是順序寫文件的)。但是區別於這條消息沒有確定狀態(Pending狀態,事務懸而未決),需要一個操作來標識這條消息的最終狀態。RocketMQ事務消息方案中引入了Op消息的概念,用Op消息標識事務消息已經確定的狀態(Commit或者Rollback)。如果一條事務消息沒有對應的Op消息,說明這個事務的狀態還無法確定(可能是二階段失敗了)。引入Op消息後,事務消息無論是Commit或者Rollback都會記錄一個Op操作。Commit相對於Rollback只是在寫入Op消息前創建Half消息的索引。

RocketMQ將Op消息寫入到全局一個特定的Topic中通過源碼中的方法—
TransactionalMessageUtil.buildOpTopic();這個Topic是一個內部的Topic(像Half消息的Topic一樣),不會被用戶消費。Op消息的內容為對應的Half消息的存儲的Offset,這樣通過Op消息能索引到Half消息進行後續的回查操作。

在執行二階段Commit操作時,需要構建出Half消息的索引。一階段的Half消息由於是寫到一個特殊的Topic,所以二階段構建索引時需要讀取出Half消息,並將Topic和Queue替換成真正的目標的Topic和Queue,之後通過一次普通消息的寫入操作來生成一條對用戶可見的消息。所以RocketMQ事務消息二階段其實是利用了一階段存儲的消息的內容,在二階段時恢復出一條完整的普通消息,然後走一遍消息寫入流程。

如果在RocketMQ事務消息的二階段過程中失敗了,例如在做Commit操作時,出現網路問題導致Commit失敗,那麼需要通過一定的策略使這條消息最終被Commit。RocketMQ採用了一種補償機制,稱為「回查」。Broker端對未確定狀態的消息發起回查,將消息發送到對應的Procer端(同一個Group的Procer),由Procer根據消息來檢查本地事務的狀態,進而執行Commit或者Rollback。
Broker端通過對比Half消息和Op消息進行事務消息的回查並且推進CheckPoint(記錄那些事務消息的狀態是確定的)。
值得注意的是,rocketmq並不會無休止的的信息事務狀態回查,默認回查15次,如果15次回查還是無法得知事務狀態,rocketmq默認回滾該消息。

TxConsumer類實現

❷ rocketmq 發送失敗一般怎麼處理

一:RocketMQ簡介
RocketMQ是一款分布式、隊列模型的消息中間件,具有以下特點:

1.能夠保證嚴格的消息順序

2.提供豐富的消息拉取模式

3.高效的訂閱者水平擴展能力

4.實時的消息訂閱機制

5.億級消息堆積能力
二:安裝RocketMQ
下載源碼
首先我們從githup上獲取RocketMQ的源碼,目前最新的版本為3.5.8,下載地址為: 或者 wget /alibaba/RocketMQ/archive/v3.5.8.tar.gz。請注意:此時我們下載的是源碼,直接解壓時不能用的,所以我們需要編譯之後才能使用。
編譯源碼
在進行編譯源碼之前我們需要安裝JDK。如果你已經安裝過了,請跳過這里。如果你還沒有安裝過JDK,請參考這篇文章(Linux環境下安裝JDK)。然後我們還需要安裝一下Maven。Maven的安裝還是比較簡單,只需要去官方上下載的安裝吧,然後直接解壓,再配置一下環境變數就OK。接下來我們把剛才下載來的RockeMQ的源碼解壓到/usr/local/rockemq-source文件夾中。在源碼中有一個Install.sh。如圖所示:
。運行sh install.sh。在編譯完成之後,我們只要target目錄下的alibaba-rocketmq這個文件夾中內容,把alibaba-rocketmq文件夾中的內容移動到/usr/local/rocketmq中。如果你不想編譯的話,可以從這里下載編譯之後的rocketmq。(rocketmq3.5.8)。
配置環境變數
接下來我們需要配置一下環境變數。在終端中輸入以下命令:vi /etc/profile ,在文件的末尾中添加如下兩句話:export rocketmq=/usr/local/rocketmq export PATH=$PATH:$rocketmq/bin。接下來我們使配置的換將變數生效:source /etc/profile.
三:啟動RocketMQ
接下來我們啟動一下剛才編譯的RocketMQ.在啟動之前我們需要修改一下RocketMQ啟動的內存大小(如果你的系統內存比較大的話,請忽略)。我們進入到/usr/local/rocketmq/bin中,在終端中輸入以下命令修改mqnamesrv的內存大小:vi runserver.sh.修改為如圖的內容:
,接下來修改broker的內存大小:vi runbroker.sh:

啟動mqnameserver
進入到/usr/local/rocketmq/bin中輸入以下命令:nohup sh mqnamesrv > ~/logs/rocketmqlogs/namesrv.log 2>&1 &。注意最後的這個 & 不要少。
啟動mqbroker
進入到/usr/local/rocketmq/bin中輸入以下命令:nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true > ~/logs/rocketmqlogs/broker.log 2>&1 &。注意:localhost可以換成你剛才啟動mqnamesrv的IP。autoCreateTopicEnable=true
這句話不要少了。最後的 & 也不要少了。
我們可以通過 ps aux | grep java命令來查看啟動的情況。

到此,rocketmq的安裝完畢。
四:RocketMQ的小例子
procer:

[java] view plain
package com.zkn.newlearn.rocketmq;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.procer.DefaultMQProcer;
import com.alibaba.rocketmq.client.procer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

import java.util.concurrent.TimeUnit;

/**
* Created by zkn on 2016/10/27.
*/
public class ProcerTest01 {

public static void main(String[] args) {

/**
* 一個應用創建一個Procer,由應用來維護此對象,可以設置為全局對象或者單例<br>
* 注意:ProcerGroupName需要由應用來保證唯一<br>
* ProcerGroup這個概念發送普通的消息時,作用不大,但是發送分布式事務消息時,比較關鍵,
* 因為伺服器會回查這個Group下的任意一個Procer
*/
DefaultMQProcer procer = new DefaultMQProcer("ProcerGroupName");
//procer.setNamesrvAddr("192.168.180.1:9876");
procer.setNamesrvAddr("192.168.180.133:9876");
procer.setInstanceName("Procer");
/**
* Procer對象在使用之前必須要調用start初始化,初始化一次即可<br>
* 注意:切記不可以在每次發送消息時,都調用start方法
*/
try {
procer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
for (int i = 0; i < 100; i++) {
try {
/**
* 下面這段代碼表明一個Procer對象可以發送多個topic,多個tag的消息。
* 注意:send方法是同步調用,只要不拋異常就標識成功。但是發送成功也可會有多種狀態,<br>
* 例如消息寫入Master成功,但是Slave不成功,這種情況消息屬於成功,但是對於個別應用如果對消息可靠性要求極高,<br>
* 需要對這種情況做處理。另外,消息可能會存在發送失敗的情況,失敗重試由應用來處理。
*/
{
Message msg = new Message("TopicTest1",// topic
"TagA",// tag
"OrderID001",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = procer.send(msg);
System.out.println(sendResult);
}

{
Message msg = new Message("TopicTest2",
"TagB",
"OrderID001",
("Hello MetaQ TagB".getBytes()));

SendResult sendResult = procer.send(msg);
System.out.println(sendResult);
}

{
Message msg = new Message("TopicTest3",
"TagC",
"OrderID001",
("Hello MetaQ TagC").getBytes());

SendResult sendResult = procer.send(msg);

System.out.println(sendResult);
}

TimeUnit.MILLISECONDS.sleep(1000);

} catch (MQClientException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
}
}
/**
* 應用退出時,要調用shutdown來清理資源,關閉網路連接,從MetaQ伺服器上注銷自己
* 注意:我們建議應用在JBOSS、Tomcat等容器的退出銷毀方法里調用shutdown方法
*/
procer.shutdown();
}
}

閱讀全文

與rocketmq事務源碼詳解相關的資料

熱點內容
dvd光碟存儲漢子演算法 瀏覽:757
蘋果郵件無法連接伺服器地址 瀏覽:962
phpffmpeg轉碼 瀏覽:671
長沙好玩的解壓項目 瀏覽:144
專屬學情分析報告是什麼app 瀏覽:564
php工程部署 瀏覽:833
android全屏透明 瀏覽:737
阿里雲伺服器已開通怎麼辦 瀏覽:803
光遇為什麼登錄時伺服器已滿 瀏覽:302
PDF分析 瀏覽:484
h3c光纖全工半全工設置命令 瀏覽:143
公司法pdf下載 瀏覽:381
linuxmarkdown 瀏覽:350
華為手機怎麼多選文件夾 瀏覽:683
如何取消命令方塊指令 瀏覽:349
風翼app為什麼進不去了 瀏覽:778
im4java壓縮圖片 瀏覽:362
數據查詢網站源碼 瀏覽:150
伊克塞爾文檔怎麼進行加密 瀏覽:892
app轉賬是什麼 瀏覽:163