導航:首頁 > 源碼編譯 > rabbitmq源碼怎麼分析

rabbitmq源碼怎麼分析

發布時間:2023-08-10 12:49:05

① 求java學習路線圖

/*回答內容很長,能看完的少走一個月彎路,絕不抖機靈*/

提前預警:本文適合Java新手閱讀(老手可在評論區給下建議),希望大家看完能有所收獲。

廢話不多少了,先了解一下Java零基礎入門學習路線:

第一階段:JavaSE階段

變數、數據類型、運算符

控制語句

面向對象編程-基礎

面向對象編程-進階

異常機制

Java常用類

Wrapper包裝類

第二階段:資料庫

第三階段:JavaEE階段

第四階段:框架階段

第五階段:前後端分離階段

第六階段:微服務架構

第七階段:雲服務階段

② 在linux下安裝rabbitmq失敗怎麼解決

RabbitMQ 是由 LShift 提供的一個 Advanced Message Queuing Protocol (AMQP) 的開源實現,由以高性能、健壯以及可伸縮性出名的 Erlang 寫成,因此也是繼承了這些優點。
AMQP 里主要要說兩個組件:Exchange 和 Queue (在 AMQP 1.0 里還會有變動),如下圖所示,綠色的 X 就是 Exchange ,紅色的是 Queue ,這兩者都在 Server 端,又稱作 Broker ,這部分是 RabbitMQ 實現的,而藍色的則是客戶端,通常有 Procer 和 Consumer 兩種類型:

1:mq的安裝需要Erlang,所以首先下載Erlang,下載地址:http://www.erlang.org/download.html直接下載源碼,編譯安裝即可。
將下載好的tar包解壓編譯安裝,如下命令:
tar -zxvf otp_src_R16B03-1.tar.gz

cd otp_src_R16B03-1
./configure && make install

安裝過程中可能出現如下錯誤:
configure:error:
No curses library functions found
configure: error:/bin/sh'/home/niewf/software/erlang_R13B01/erts/configure'
failed for erts

解決方法:
yum list|grep ncurses
yum -y install ncurses-devel
yum install ncurses-devel

或者直接下載ncurses包編譯安裝。
下載地址:http://download.chinaunix.net/download/0008000/7242.shtml
tar zxvf ncurses.tar.gz #解壓縮並且釋放 文件包
cd ncurses #進入解壓縮的目錄(注意版本)
./configure #按照你的系統環境製作安裝配置文件
make #編譯源代碼並且編譯NCURSES庫
su root #切換到root用戶環境
make install #安裝編譯好的NCURSES庫

完成後繼續返回上一步操作。

2:安裝python,如果系統中python版本低於2.5的話需要升級python到2.6以上,具體可參考:http://gavinshaw.blog.51cto.com/385947/610585

3:安裝simplejson,直接下載simplejson源碼包編譯安裝即可,下載地址:https://pypi.python.org/pypi/simplejson/。
下載simplejson源碼包後,運行python setup.py install即可完成安裝。

4:安裝rabbit mq,下載地址:https://www.rabbitmq.com/install-generic-unix.html
下載後放入相應目錄解壓,進入%RABBITMQ_HOME%/sbin目錄下運行:./rabbitmq-server start即可啟動mq。
如果遇到如下錯誤,則參考http://leeon.me/a/rabbitmq-start-fail-note解決方案
ERROR: epmd error for host "xxx": address (cannot connect to host/port)
到此mq已經安裝完成。
在%RABBITMQ_HOME%/sbin目錄運行./rabbitmqctl status可查看當前mq狀態。
同時mq也提供了界面查看當前mq狀態,但是需要啟用該插件功能,運行如下命令:
rabbitmq-plugins enable rabbitmq_management,然後在瀏覽器中輸入:http://host-name:15672/#/即可訪問,頁面結果如下:

③ Spring整合rabbitmq實踐(一):基礎

Spring整合rabbitmq實踐(二):擴展
Spring整合rabbitmq實踐(三):源碼

procer:消息生產者;

consumer:消息消費者;

queue:消息隊列;

exchange:接收procer發送的消息按照binding規則轉發給相應的queue;

binding:exchange與queue之間的關系;

virtualHost:每個virtualHost持有自己的exchange、queue、binding,用戶只能在virtualHost粒度控制許可權。

fanout:

群發到所有綁定的queue;

direct:

根據routing key routing到相應的queue,routing不到任何queue的消息扔掉;可以不同的key綁到同一個queue,也可以同一個key綁到不同的queue;

topic:

類似direct,區別是routing key是由一組以「.」分隔的單片語成,可以有通配符,「*」匹配一個單詞,「#」匹配0個或多個單詞;

headers:

根據arguments來routing。

arguments為一組key-value對,任意設置。

「x-match」是一個特殊的key,值為「all」時必須匹配所有argument,值為「any」時只需匹配任意一個argument,不設置默認為「all」。

通過以下配置,可以獲得最基礎的發送消息到queue,以及從queue接收消息的功能。

這個包同時包含了一些其它的包:spring-context、spring-tx、spring-web、spring-messaging、spring-retry、spring-amqp、amqp-client,如果想單純一點,可以單獨引入。

最主要的是以下幾個包,

spring-amqp:

spring-rabbit:

amqp-client:

個人理解就是,spring-amqp是spring整合的amqp,spring-rabbit是spring整合的rabbitmq(rabbitmq是amqp的一個實現,所以可能spring-rabbit也是類似關系),amqp-client提供操作rabbitmq的java api。

目前最新的是2.0.5.RELEASE版本。如果編譯報錯,以下信息或許能有所幫助:

(1)

解決方案:spring-amqp版本改為2.0.5.RELEASE。

(2)

解決方案:spring-context版本改為5.0.7.RELEASE。

(3)

解決方案:spring-core版本改為5.0.7.RELEASE。

(4)

解決方案:spring-beans版本改為5.0.7.RELEASE。

(5)

解決方案:spring-aop版本改為5.0.7.RELEASE。

總之,需要5.0.7.RELEASE版本的spring,及相匹配版本的amqp-client。

後面所講的這些bean配置,spring-amqp中都有默認配置,如果不需要修改默認配置,則不用人為配置這些bean。後面這些配置也沒有涉及到所有的屬性。

這里的ConnectionFactory指的是spring-rabbit包下面的ConnectionFactory介面,不是amqp-client包下面的ConnectionFactory類。

上面這個bean是spring-amqp的核心,不論是發送消息還是接收消息都需要這個bean,下面描述一下裡面這些配置的含義。

setAddresses :設置了rabbitmq的地址、埠,集群部署的情況下可填寫多個,「,」分隔。

setUsername :設置rabbitmq的用戶名。

setPassword :設置rabbitmq的用戶密碼。

setVirtualHost :設置virtualHost。

setCacheMode :設置緩存模式,共有兩種, CHANNEL 和 CONNECTION 模式。

CHANNEL 模式,程序運行期間ConnectionFactory會維護著一個Connection,所有的操作都會使用這個Connection,但一個Connection中可以有多個Channel,操作rabbitmq之前都必須先獲取到一個Channel,否則就會阻塞(可以通過setChannelCheckoutTimeout()設置等待時間),這些Channel會被緩存(緩存的數量可以通過setChannelCacheSize()設置);

CONNECTION 模式,這個模式下允許創建多個Connection,會緩存一定數量的Connection,每個Connection中同樣會緩存一些Channel,除了可以有多個Connection,其它都跟CHANNEL模式一樣。

這里的Connection和Channel是spring-amqp中的概念,並非rabbitmq中的概念,官方文檔對Connection和Channel有這樣的描述:

關於 CONNECTION 模式中,可以存在多個Connection的使用場景,官方文檔的描述:

setChannelCacheSize :設置每個Connection中(注意是每個Connection)可以緩存的Channel數量,注意只是緩存的Channel數量,不是Channel的數量上限,操作rabbitmq之前(send/receive message等)要先獲取到一個Channel,獲取Channel時會先從緩存中找閑置的Channel,如果沒有則創建新的Channel,當Channel數量大於緩存數量時,多出來沒法放進緩存的會被關閉。

注意,改變這個值不會影響已經存在的Connection,隻影響之後創建的Connection。

setChannelCheckoutTimeout :當這個值大於0時, channelCacheSize 不僅是緩存數量,同時也會變成數量上限,從緩存獲取不到可用的Channel時,不會創建新的Channel,會等待這個值設置的毫秒數,到時間仍然獲取不到可用的Channel會拋出AmqpTimeoutException異常。

同時,在 CONNECTION 模式,這個值也會影響獲取Connection的等待時間,超時獲取不到Connection也會拋出AmqpTimeoutException異常。

setPublisherReturns、setPublisherConfirms :procer端的消息確認機制(confirm和return),設為true後開啟相應的機制,後文詳述。

官方文檔描述publisherReturns設為true打開return機制,publisherComfirms設為true打開confirm機制,但測試結果(2.0.5.RELEASE版本)是,任意一個設為true,兩個都會打開。

addConnectionListener、addChannelListener、setRecoveryListener :添加或設置相應的Listener,後文詳述。

setConnectionCacheSize :僅在 CONNECTION 模式使用,設置Connection的緩存數量。

setConnectionLimit :僅在 CONNECTION 模式使用,設置Connection的數量上限。

上面的bean配置,除了需要注入的幾個listener bean以外,其它設置的都是其默認值(2.0.5.RELEASE版本),後面的bean示例配置也是一樣,部分屬性不同版本的默認值可能有所不同。

一般不用配置這個bean,這里簡單提一下。

這個ConnectionFactory是rabbit api中的ConnectionFactory類,這裡面是連接rabbitmq節點的Connection配置。

如果想修改這些配置,可以按如下方式配置:

consumer端如果通過@RabbitListener註解的方式接收消息,不需要這個bean。

不建議直接通過ConnectionFactory獲取Channel操作rabbitmq,建議通過amqpTemplate操作。

setConnectionFactory :設置spring-amqp的ConnectionFactory。

setRetryTemplate :設置重試機制,詳情見後文。

setMessageConverter :設置MessageConverter,用於java對象與Message對象(實際發送和接收的消息對象)之間的相互轉換,詳情見後文。

setChannelTransacted :打開或關閉Channel的事務,關於amqp的事務後文描述。

setReturnCallback、setConfirmCallback :return和confirm機制的回調介面,後文詳述。

setMandatory :設為true使ReturnCallback生效。

這個bean僅在consumer端通過@RabbitListener註解的方式接收消息時使用,每一個@RabbitListener註解的方法都會由這個創建一個MessageListenerContainer,負責接收消息。

setConnectionFactory :設置spring-amqp的ConnectionFactory。

setMessageConverter :對於consumer端,MessageConverter也可以在這里配置。

setAcknowledgeMode :設置consumer端的應答模式,共有三種:NONE、AUTO、MANUAL。

NONE,無應答,這種模式下rabbitmq默認consumer能正確處理所有發出的消息,所以不管消息有沒有被consumer收到,有沒有正確處理都不會恢復;

AUTO,由Container自動應答,正確處理發出ack信息,處理失敗發出nack信息,rabbitmq發出消息後將會等待consumer端的應答,只有收到ack確認信息才會把消息清除掉,收到nack信息的處理辦法由setDefaultRequeueRejected()方法設置,所以在這種模式下,發生錯誤的消息是可以恢復的。

MANUAL,基本同AUTO模式,區別是需要人為調用方法給應答。

setConcurrentConsumers :設置每個MessageListenerContainer將會創建的Consumer的最小數量,默認是1個。

setMaxConcurrentConsumers :設置每個MessageListenerContainer將會創建的Consumer的最大數量,默認等於最小數量。

setPrefetchCount :設置每次請求發送給每個Consumer的消息數量。

setChannelTransacted :設置Channel的事務。

setTxSize :設置事務當中可以處理的消息數量。

setDefaultRequeueRejected :設置當rabbitmq收到nack/reject確認信息時的處理方式,設為true,扔回queue頭部,設為false,丟棄。

setErrorHandler :實現ErrorHandler介面設置進去,所有未catch的異常都會由ErrorHandler處理。

AmqpTamplate裡面有下面幾個方法可以向queue發送消息:

這里,exchange必須存在,否則消息發不出去,會看到錯誤日誌,但不影響程序運行:

Message是org.springframework.amqp.core.Message類,spring-amqp發送和接收的都是這個Message。

從Message類源碼可以看到消息內容放在byte[]裡面,MessageProperties對象包含了非常多的一些其它信息,如Header、exchange、routing key等。

這種方式,需要將消息內容(String,或其它Object)轉換為byte[],示例:

也可以直接調用下面幾個方法,Object將會自動轉為Message對象發送:

有兩種方法接收消息:

1.polling consumer,輪詢調用方法一次獲取一條;

2.asynchronous consumer,listener非同步接收消息。

polling consumer

直接通過AmqpTemplate的方法從queue獲取消息,有如下方法:

如果queue裡面沒有消息,會立刻返回null;傳入timeoutMillis參數後可阻塞等待一段時間。

如果想直接從queue獲取想要的java對象,可調用下面這一組方法:

後面4個方法是帶泛型的,示例如下:

使用這四個方法需要配置org.springframework.amqp.support.converter.SmartMessageConverter,這是一個介面,Jackson2JsonMessageConverter已經實現了這個介面,所以只要將Jackson2JsonMessageConverter設置到RabbitTemplate中即可。

asynchronous consumer

有多種方式可以實現,詳情參考官方文檔。

最簡單的實現方式是@RabbitListener註解,示例:

這里接收消息的對象用的是Message,也可以是自定義的java對象,但調用Converter轉換失敗會報錯。

註解上指定的queue必須是已經存在並且綁定到某個exchange的,否則會報錯:

如果在@RabbitListener註解中指明binding信息,就能自動創建queue、exchange並建立binding關系。

direct和topic類型的exchange需要routingKey,示例:

fanout類型的exchange,示例:

2.0版本之後,可以指定多個routingKey,示例:

並且支持arguments屬性,可用於headers類型的exchange,示例:

@Queue有兩個參數exclusive和autoDelete順便解釋一下:

exclusive,排他隊列,只對創建這個queue的Connection可見,Connection關閉queue刪除;

autoDelete,沒有consumer對這個queue消費時刪除。

對於這兩種隊列,rable=true是不起作用的。

另外,如果註解申明的queue和exchange及binding關系都已經存在,但與已存在的設置不同,比如,已存在的exchange的是direct類型,這里嘗試改為fanout類型,結果是不會有任何影響,不論是修改或者新增參數都不會生效。

如果queue存在,exchange存在,但沒有binding,那麼程序啟動後會自動建立起binding關系。

閱讀全文

與rabbitmq源碼怎麼分析相關的資料

熱點內容
安卓為什麼免費使用 瀏覽:397
加密貨幣都有哪些平台 瀏覽:625
python和matlab難度 瀏覽:388
python爬蟲很難學么 瀏覽:572
小米解壓積木可以組成什麼呢 瀏覽:816
為什麼滴滴出行app還能用 瀏覽:564
怎麼升級手機android 瀏覽:922
php權威編程pdf 瀏覽:994
扣扣加密技巧 瀏覽:720
蘋果如何創建伺服器錯誤 瀏覽:495
軟考初級程序員大題分值 瀏覽:474
js壓縮視頻文件 瀏覽:578
linux如何通過命令創建文件 瀏覽:991
應用加密app還能訪問應用嘛 瀏覽:434
安卓怎麼用支付寶交違章罰款 瀏覽:665
php面向對象的程序設計 瀏覽:504
數據挖掘演算法書籍推薦 瀏覽:894
投訴聯通用什麼app 瀏覽:152
web伺服器變更ip地址 瀏覽:956
java正則表達式驗證郵箱 瀏覽:362