❶ 學python最想要提升的是哪些地方
1.學習 Python 包並實現基本的爬蟲過程
大部分爬蟲都是按「發送請求——獲得頁面——解析頁面——抽取並儲存內容」這樣的流程來進行,這其實也是模擬了我們使用瀏覽器獲取網頁信息的過程。Python中爬蟲相關的包很多:urllib、requests、bs4、scrapy、pyspider 等,建議從requests+Xpath 開始,requests 負責連接網站,返回網頁,Xpath 用於解析網頁,便於抽取數據。
如果你用過 BeautifulSoup,會發現 Xpath 要省事不少,一層一層檢查元素代碼的工作,全都省略了。這樣下來基本套路都差不多,一般的靜態網站根本不在話下。當然如果你需要爬取非同步載入的網站,可以學習瀏覽器抓包分析真實請求或者學習Selenium來實現自動化。
2.了解非結構化數據的存儲
爬回來的數據可以直接用文檔形式存在本地,也可以存入資料庫中。開始數據量不大的時候,你可以直接通過 Python 的語法或 pandas 的方法將數據存為csv這樣的文件。當然你可能發現爬回來的數據並不是干凈的,可能會有缺失、錯誤等等,你還需要對數據進行清洗,可以學習 pandas 包的基本用法來做數據的預處理,得到更干凈的數據。
3.學習scrapy,搭建工程化爬蟲
掌握前面的技術一般量級的數據和代碼基本沒有問題了,但是在遇到非常復雜的情況,可能仍然會力不從心,這個時候,強大的 scrapy 框架就非常有用了。scrapy 是一個功能非常強大的爬蟲框架,它不僅能便捷地構建request,還有強大的 selector 能夠方便地解析 response,然而它最讓人驚喜的還是它超高的性能,讓你可以將爬蟲工程化、模塊化。學會 scrapy,你可以自己去搭建一些爬蟲框架,你就基本具備Python爬蟲工程師的思維了。
4.學習資料庫知識,應對大規模數據存儲與提取
Python客棧送紅包、紙質書
爬回來的數據量小的時候,你可以用文檔的形式來存儲,一旦數據量大了,這就有點行不通了。所以掌握一種資料庫是必須的,學習目前比較主流的 MongoDB 就OK。MongoDB 可以方便你去存儲一些非結構化的數據,比如各種評論的文本,圖片的鏈接等等。你也可以利用PyMongo,更方便地在Python中操作MongoDB。因為這里要用到的資料庫知識其實非常簡單,主要是數據如何入庫、如何進行提取,在需要的時候再學習就行。
5.掌握各種技巧,應對特殊網站的反爬措施
當然,爬蟲過程中也會經歷一些絕望啊,比如被網站封IP、比如各種奇怪的驗證碼、userAgent訪問限制、各種動態載入等等。遇到這些反爬蟲的手段,當然還需要一些高級的技巧來應對,常規的比如訪問頻率控制、使用代理IP池、抓包、驗證碼的OCR處理等等。往往網站在高效開發和反爬蟲之間會偏向前者,這也為爬蟲提供了空間,掌握這些應對反爬蟲的技巧,絕大部分的網站已經難不到你了。
6.分布式爬蟲,實現大規模並發採集,提升效率
爬取基本數據已經不是問題了,你的瓶頸會集中到爬取海量數據的效率。這個時候,相信你會很自然地接觸到一個很厲害的名字:分布式爬蟲。分布式這個東西,聽起來很恐怖,但其實就是利用多線程的原理讓多個爬蟲同時工作,需要你掌握Scrapy+ MongoDB + Redis 這三種工具。Scrapy 前面我們說過了,用於做基本的頁面爬取,MongoDB 用於存儲爬取的數據,Redis 則用來存儲要爬取的網頁隊列,也就是任務隊列。所以有些東西看起來很嚇人,但其實分解開來,也不過如此。當你能夠寫分布式的爬蟲的時候,那麼你可以去嘗試打造一些基本的爬蟲架構了,實現一些更加自動化的數據獲取。
只要按照以上的Python爬蟲學習路線,一步步完成,即使是新手小白也能成為老司機,而且學下來會非常輕松順暢。所以新手在一開始的時候,盡量不要系統地去啃一些東西,找一個實際的項目,直接開始操作。
其實學Python編程和練武功其實很相似,入門大致這樣幾步:找本靠譜的書,找個靠譜的師傅,找一個地方開始練習。
學語言也是這樣的:選一本通俗易懂的書,找一個好的視頻資料,然後自己裝一個IDE工具開始邊學邊寫。
7.給初學Python編程者的建議:
①信心。可能你看了視頻也沒在屏幕上做出點啥,都沒能把程序運行起來。但是要有自信,所有人都是這樣過來的。
②選擇適合自己的教程。有很早的書籍很經典,但是不是很適合你,很多書籍是我們學過一遍Python之後才會發揮很大作用。
③寫代碼,就是不斷地寫,練。這不用多說,學習什麼語言都是這樣。總看視頻,編不出東西。可以從書上的小案例開始寫,之後再寫完整的項目。
④除了學Python,計算機的基礎也要懂得很多,補一些英語知識也行。
⑤不但會寫,而且會看,看源碼是一個本領,調試代碼更是一個本領,就是解決問題的能力,挑錯。理解你自己的報錯信息,自己去解決。
⑥當你到達了一個水平,就多去看官方的文檔,在CSDN上面找下有關Python的博文或者群多去交流。
希望想學習Python的利用好現在的時間,管理好自己的學習時間,有效率地學習Python,Python這門語言可以做很多事情。
❷ lwip源碼解析
lwip源碼解析
lwip 是 TCP/IP 協議棧的輕量化實現,它在嵌入式平台上廣泛應用,尤其在資源有限的 MCU 設備上。lwip 的體積小巧,運行內存需求僅幾十 KB,支持裸機移植和操作系統移植。
lwip 提供了三種介面類型:raw api、netconn api 和 socket api。raw api 是基於事件驅動,以回調函數形式實現,適用於裸機環境。netconn api 是順序 API,需要線程機制支持,適用於操作系統環境。socket api 同樣是順序 API,基於 netconn api 重新封裝,以兼容 POSIX 標准。
在 lwip 的內存管理方面,它有自己的動態內存管理機制,包括簡單的內存管理和基於內存池的內存管理。簡單內存管理通過 mem 模塊實現,基於全局數組劃分內存區域,用戶通過 mem_init 初始化堆區,並在分配和釋放內存時,通過查找和合並空閑內存塊進行操作。基於內存池的內存管理使用 memp 模塊,通過內存池分配和釋放內存,內存池由多個固定大小的內存塊構成,適用於特定類型的數據存儲。
lwip 使用 pbuf 結構體對數據進行封裝和協議棧內部傳遞,pbuf 的重要性與 linux 內核協議棧中的 skb_buff 類似。pbuf 可以是 PBUF_RAM、PBUF_POOL、PBUF_REF 或 PBUF_ROM 類型,分別用於不同內存管理和數據引用場景。
在數據收發過程中,以 UDP 為例,lwip 通過注冊回調函數實現 UDP 接收,通過創建 pbuf 和調用相關函數實現 UDP 發送。發送時,lwip 會根據目的 IP 地址查找路由、綁定本地 IP 和埠、添加 UDP 和 IP 頭部信息,並最終通過網卡驅動發送數據。接收時,數據通過注冊的回調函數以 pbuf 形式傳遞給協議棧,解析乙太網幀頭部後,根據協議類型調用不同介面進行進一步處理。
在 TCP 協議的實現中,lwip 通過監聽和接收回調函數管理連接狀態,處理 SYN 報文時,會生成初始化序列號,進行序列號與確認機制,並通過計算 MSS 和發送窗口管理數據收發過程中的數據管理。在接收過程,lwip 會接收 TCP 報文並處理,計算接收窗口裁剪數據,並通過接收數據緩沖區進行應用層的數據處理。當出現超時重傳時,lwip 會更新重傳定時器、計算 RTT 和 RTO,並在快重傳和快恢復模式下調整擁塞窗口。
❸ MySQL與Redis資料庫連接池介紹(圖示+源碼+代碼演示)
資料庫連接池(Connection pooling)是程序啟動時建立足夠的資料庫連接,並將這些連接組成一個連接池,由程序動態地對池中的連接進行申請,使用,釋放。
簡單的說:創建資料庫連接是一個很耗時的操作,也容易對資料庫造成安全隱患。所以,在程序初始化的時候,集中創建多個資料庫連接,並把他們集中管理,供程序使用,可以保證較快的資料庫讀寫速度,還更加安全可靠。
不使用資料庫連接池
如果不使用資料庫連接池,對於每一次SQL操作,都要走一遍下面完整的流程:
1.TCP建立連接的三次握手(客戶端與 MySQL伺服器的連接基於TCP協議)
2.MySQL認證的三次我收
3.真正的SQL執行
4.MySQL的關閉
5.TCP的四次握手關閉
可以看出來,為了執行一條SQL,需要進行大量的初始化與關閉操作
使用資料庫連接池
如果使用資料庫連接池,那麼會 事先申請(初始化)好 相關的資料庫連接,然後在之後的SQL操作中會復用這些資料庫連接,操作結束之後資料庫也不會斷開連接,而是將資料庫對象放回到資料庫連接池中
資源重用:由於資料庫連接得到重用,避免了頻繁的創建、釋放連接引起的性能開銷,在減少系統消耗的基礎上,另一方面也增進了系統運行環境的平穩性(減少內存碎片以及資料庫臨時進程/線程的數量)。
更快的系統響應速度:資料庫連接池在初始化過程中,往往已經創建了若干資料庫連接置於池中備用。 此時連接的初始化工作均已完成。對於業務請求處理而言,直接利用現有可用連接,避免了從資料庫連接初始化和釋放過程的開銷,從而縮減了系統整體響應時間。
統一的連接管理,避免資料庫連接泄露:在較為完備的資料庫連接池實現中,可根據預先的連接佔用超時設定,強制收回被佔用連接。從而避免了常規資料庫連接操作中可能出現的資源泄露。
如果說你的伺服器CPU是4核i7的,連接池大小應該為((4*2)+1)=9
相關視頻推薦
90分鍾搞懂資料庫連接池技術|linux後台開發
《tcp/ip詳解卷一》: 150行代碼拉開協議棧實現的篇章
學習地址:C/C++Linux伺服器開發/後台架構師【零聲教育】-學習視頻教程-騰訊課堂
需要C/C++ Linux伺服器架構師學習資料加qun 812855908 獲取(資料包括 C/C++,Linux,golang技術,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg 等),免費分享
源碼下載
下載方式:https://github.com/dongyusheng/csdn-code/tree/master/db_pool(Github中下載)
db_pool目錄下有兩個目錄,mysql_pool目錄為MySQL連接池代碼,redis_pool為redis連接池代碼
下面介紹mysql_pool
CDBConn解析
概念: 代表一個數據連接對象實例
相關成員:
m_pDBPool:該資料庫連接對象所屬的資料庫連接池
構造函數: 綁定自己所屬於哪個資料庫連接池
Init()函數: 創建資料庫連接句柄
CDBPool解析
概念:代表一個資料庫連接池
相關成員:
Init()函數:常見指定數量的資料庫實例句柄,然後添加到m_free_list中,供後面使用
GetDBConn()函數: 用於從空閑隊列中返回可以使用的資料庫連接句柄
RelDBConn()函數: 程序使用完該資料庫句柄之後,將句柄放回到空閑隊列中
測試之前,將代碼中的資料庫地址、埠、賬號密碼等改為自己的(代碼中有好幾處)
進入MySQL, 創建mysql_pool_test資料庫
進入到mysql_pool目錄下, 創建一個build目錄並進入 :
之後就會在目錄下生成如下的可執行文件
輸入如下兩條命令進行測試: 可以看到不使用資料庫連接池,整個操作耗時4秒左右;使用連接池之後,整個操作耗時2秒左右,提升了一倍
源碼下載
下面介紹redis_pool
測試
進入到redis_pool目錄下, 創建一個build目錄並進入 :
然後輸入如下的命令進行編譯
之後就會在目錄下生成如下的可執行文件
輸入如下的命令進行測試: 可以看到不使用資料庫連接池,整個操作耗時182ms;使用連接池之後,整個操作耗時21ms,提升了很多
進入redis,可以看到我們新建的key:
❹ RocketMQ源碼分析4:Broker處理消息流程
基於RocketMQ-4.9.0 版本分析rocketmq
1.接收和處理請求其實這里和NameServer處理請求的過程是一樣的。 在前面Broker啟動過程文章中,我們知道Broker啟動時,最終會啟動一個netty服務,可以接收讀寫請求。那麼這篇文章我們就來看看他是如何接收和處理請求的。
在前面分析Broker啟動的過程中,我們通過源碼看到,netty服務端啟動類會綁定很多ChannelHandler,有負責處理握手的,有負責處理心跳的,有負責處理連接的,也有負責讀寫的,其中NettyServerHandler就是負責讀寫的。 我們在簡單看下代碼:
//TODO: ServerBootstrap 是netty的啟動類ServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()).childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())//TODO:綁定ip和埠.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),//TODO:處理心跳的ChannelHandlernew IdleStateHandler(0, 0, nettyServerConfig.()),//TODO:處理連接的 ,//TODO: 處理讀寫的 ChannelHandler (NettyServerHandler)serverHandler);}});broker開放10911埠用來與procer和consumer進行通信。 所以,NettyServerHandler?類就是我們分析的入口: 它是?NettyRemotingServer?類的內部類
@Sharableclass NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { @Overrideprotected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {processMessageReceived(ctx, msg);}}
> 看到它是不是很熟悉?沒錯,`NameServer`處理請求也是它(不過他們是分別new的不同對象)然後我們繼續看它的內部實現:```javapublic void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {//TODO:從處理器表中獲取Pair對象final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;final int opaque = cmd.getOpaque();if (pair != null) {//TODO:構建一個線程,當submit到線程池中時會執行//TODO: 先不要展開看這里,繼續往後看,會將該run 提交到線程池中Runnable run = new Runnable() {@Overridepublic void run() {try {//TODO:...省略部分代碼...... //TODO:核心if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();processor.asyncProcessRequest(ctx, cmd, callback);} else {NettyRequestProcessor processor = pair.getObject1();RemotingCommand response = processor.processRequest(ctx, cmd);callback.callback(response);}} catch (Throwable e) { //TODO:....省略catch......}}};//TODO: ....省略部分代碼.......try {//TODO:將上面創建的Runnable放入線程池中然後執行final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);pair.getObject2().submit(requestTask);} catch (RejectedExecutionException e) {//TODO:省略catch代碼.......}} else {//TODO:...省略else......}}總結一下就主要做兩件事,獲取處理器,然後調用處理器的處理方法。
獲取處理器
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;從處理器表processorTable中根據code獲取處理器,在上一篇文章Broker啟動-注冊處理器中,它會注冊處理器到這個處理器表中,這個code 就是用來區分業務場景的。比如生產者發送消息的code=RequestCode.SEND_MESSAGE,消費者拉取消息的code=RequestCode.PULL_MESSAGE。
我們這里先不關注code,總之就是從處理器表processorTable中獲取一個處理器。這里提一嘴,在分析NameServer處理請求的時候,它也會注冊處理器,不過它不會向這個處理器表中注冊處理器,而是使用一個默認的處理器。
調用處理器的處理方法
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) { AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1(); processor.asyncProcessRequest(ctx, cmd, callback);} else { NettyRequestProcessor processor = pair.getObject1(); RemotingCommand response = processor.processRequest(ctx, cmd); callback.callback(response);}在調用處理方法之前,我們要知道這個Pair對象是什麼? 在前面Broker啟動-注冊處理器一文中,我們發現,它會new 一個 Pair對象放入到處理器表中,作為Map的value,而Pair對象有兩個泛型T1,T2,分別對應著處理器和線程池。
public class Pair<T1, T2> { private T1 object1; private T2 object2; public Pair(T1 object1, T2 object2) { this.object1 = object1; this.object2 = object2; }}object1=業務處理器,object2=線程池 那麼接下來就是進入處理器開始處理業務邏輯了,那麼我們就以接收procer發送消息的處理器SendMessageProcessor為例來往下看 從類的繼承結構上看,它繼承了 AsyncNettyRequestProcessor, 處理器基本上都繼承了它,所以走的是非同步邏輯(也就是if邏輯)
SendMessageProcessor#asyncProcessRequest(..)
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {//TODO: 消費失敗,發起重試,則會來到這里case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}mqtraceContext = buildMsgContext(ctx, requestHeader);this.executeSendMessageHookBefore(ctx, request, mqtraceContext);if (requestHeader.isBatch()) {//TODO: 批量消息return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {//TODO: 普通消息發送就會走這里return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}}接下來我們就具體分析Broker是如何一步一步將消息存儲起來的。
2.消息存儲處理procer發送消息的處理器是 SendMessageProcessor。
生產者可以發送普通消息,順序消息,延遲消息,事務消息,批量消息,我們就以普通消息為例來分析。
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {//TODO:預發送final RemotingCommand response = preSend(ctx, request, requestHeader);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();//TODO:....省略部分代碼......//TODO:如果沒有指定queueid,則隨機指定一個if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}//TODO:構建消息體對象,保存topic,queueid,msg等內容MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);msgInner.setBody(body);//TODO:.......other info......msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));CompletableFuture<PutMessageResult> putMessageResult = null;Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (transFlag != null && Boolean.parseBoolean(transFlag)) {//TODO: 事務消息putMessageResult = this.brokerController.().asyncPrepareMessage(msgInner);} else {//TODO: 普通消息putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}
@Sharableclass NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {0那麼我們繼續點進去CommitLog的內部實現:
代碼是非常的多,還是只保留關注的部分
@Sharableclass NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {1代碼比較多,我們簡單總結下:
判斷是否為延遲消息,如果是延遲消息,則將topic替換為SCHEDULE_TOPIC_XXXX,將queueid替換為(延遲級別-1),將原始的topic和queueid保存到消息對象的Properties屬性中。
獲取最新的MappedFile文件對象。這個MappedFile在這里可以理解為是消息文件的邏輯映射,然後調用MappedFile對象的方法put消息.
提交刷盤請求
同步刷盤:只有在消息真正持久化至磁碟後RocketMQ的Broker端才會真正返回給Procer端一個成功的ACK響應。同步刷盤對MQ消息可靠性來說是一種不錯的保障,但是性能上會有較大影響,一般適用於金融業務應用該模式較多
非同步刷盤:能夠充分利用OS的PageCache的優勢,只要消息寫入PageCache即可將成功的ACK返回給Procer端。消息刷盤採用後台非同步線程提交的方式進行,降低了讀寫延遲,提高了MQ的性能和吞吐量。
所以我們繼續看第2點,MappedFile它是如何Put消息的:
@Sharableclass NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {2然後我們繼續看cb.doAppend(....)方法,看它如何存儲消息:
@Sharableclass NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {3這個方法的內容也比較多,我們還是簡單總結下:
設置消息的msgId(說明msgId是broker端生成的)
計算consumequeue的offset(在broker啟動時,會載入配置文件中的內容到這個Map中)
在消息分發時,會根據這個offset值按順序寫入到索引文件中。
判斷消息是否超過4M,如果超過4M則返回一個MESSAGE_SIZE_EXCEEDED錯誤碼。
單個消息大小限制是4M,procer端會check, broker端會check,這個值可以修改,但注意要修改兩端。
判斷當前文件是否還寫的下當前消息,如果寫不下,則新建一個MappedFile,再次寫入
初始化存儲空間,寫入一個消息體的全部內容,然後放入緩沖區。消息體內容大致如圖:
消息寫入到commitlog後,這條消息的處理流程就結束了。然後接下來就是消息分發(構建消息索引)
關於消息存儲,我們看下官網的文章闡述:
消息存儲整體架構
消息存儲架構圖中主要有下面三個跟消息存儲相關的文件構成。
(1) CommitLog:消息主體以及元數據的存儲主體,存儲Procer端寫入的消息主體內容,消息內容不是定長的。單個文件大小默認1G, 文件名長度為20位,左邊補零,剩餘為起始偏移量,比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當第一個文件寫滿了,第二個文件為00000000001073741824,起始偏移量為1073741824,以此類推。消息主要是順序寫入日誌文件,當文件滿了,寫入下一個文件;
(2) ConsumeQueue:消息消費隊列,引入的目的主要是提高消息消費的性能,由於RocketMQ是基於主題topic的訂閱模式,消息消費是針對主題進行的,如果要遍歷commitlog文件中根據topic檢索消息是非常低效的。Consumer即可根據ConsumeQueue來查找待消費的消息。其中,ConsumeQueue(邏輯消費隊列)作為消費消息的索引,保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基於topic的commitlog索引文件,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結構,具體存儲路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同樣consumequeue文件採取定長設計,每一個條目共20個位元組,分別為8位元組的commitlog物理偏移量、4位元組的消息長度、8位元組tag hashcode,單個文件由30W個條目組成,可以像數組一樣隨機訪問每一個條目,每個ConsumeQueue文件大小約5.72M;
(3) IndexFile:IndexFile(索引文件)提供了一種可以通過key或時間區間來查詢消息的方法。Index文件的存儲位置是:$home/store/index${fileName},文件名fileName是以創建時的時間戳命名的,固定的單個IndexFile文件大小約為400M,一個IndexFile可以保存 2000W個索引,IndexFile的底層存儲設計為在文件系統中實現HashMap結構,故rocketmq的索引文件其底層實現為hash索引。
在上面的RocketMQ的消息存儲整體架構圖中可以看出,RocketMQ採用的是混合型的存儲結構,即為Broker單個實例下所有的隊列共用一個日誌數據文件(即為CommitLog)來存儲。RocketMQ的混合型存儲結構(多個Topic的消息實體內容都存儲於一個CommitLog中)針對Procer和Consumer分別採用了數據和索引部分相分離的存儲結構,Procer發送消息至Broker端,然後Broker端使用同步或者非同步的方式對消息刷盤持久化,保存至CommitLog中。只要消息被刷盤持久化至磁碟文件CommitLog中,那麼Procer發送的消息就不會丟失。正因為如