Ⅰ 春季角ngstomp WebSocket控制器不執行問題,怎麼解決
兩年前,客戶端與伺服器端的全雙工雙向通信作為一個很重要的功能被納入到WebSocket RFC 6455協議中。在HTML5中,WebSocket已經成為一個流行詞,大家對這個功能賦予很多構想,很多時候甚至是不切實際的期望。在這篇文章中,我們將重點介紹下如何通過Spring Framework 4.0來構建一個基於 STMOP協議的WebSocket形式的應用。該應用通過 Message Broker向用戶廣播消息,並使用SockJS作為瀏覽器前端通信代碼庫。
傳統的Socket交互需要很多項技術的支持,包括java applet,XMLHttpRequest,Adobe Flash, ActiveXObject, 各種 Comet和服務端發送事件等等。相較於通過如此繁雜的技術來實現Socket交互,WebSockt就顯得簡潔易用得多了。但是別高興得太 早,無論看起來如何誘人,WebSocket現在都還只不過是一個基礎而已。即便它確實為WEB的雙向通信設定了一些重要的標准,但也還只是第一步。要成 為一個成熟的Socket交互方案,WebSocket還需要解決網路代理設置和瀏覽器支持等一系列的問題。
WebSocket和REST對比
當你開始接觸WebSocket應用時,為了找尋你可能會遇到的問題的答案,你就要快速地瀏覽一下這篇文章,基於不同的應用類型其中包含了關於 WebSocket是否會替代REST的有趣而不可思議的討論。該類型(WebSocket)應用相對於偶爾工作的應用(web郵件,新聞更新等)擁有更 強大的能力來面對全天候,實時交互(比如游戲、金融、協作化、可視化等)的復雜情況,無論哪種方式,他都是通常人們所熟悉的,而且REST是我們目前通用 的web應用構建風格.在此並非想通過這樣的對比來抵制這些創新,還是讓我們看看能從中學到什麼吧,而這2個要點還是需要我們去觀察才能發現的.
首先,與其他相比,REST是一種在使用無狀態和超媒體(鏈接)時,支持許多URL和少量的HTTP方法的架構方式。與之對應的,WebSocket是一個完全不同的消息模式的架構方式。它不僅僅是一個現有AJAX技術的替代品,更是一個事件驅動的、被動的方法。
第二,REST是基於HTTP的,是在TCP之上建立的一個應用協議,能夠提供給我們用於建立應用邏輯的URLs,HTTP方法,請求或者響應頭,狀態碼,和一些其他的關鍵件。相比之下,WebSocket是一個TCP之上的一個簡單層。它就是一個沒有被定義內容的可以被分解成消息的位元組流。在不對一個消息內容做出假設的情況下,一個框架能夠做的很少。當應用做出那樣的假定之後,他們就會圍繞這些假定來創建自己的框架。
WebSocket協議定義了sub-protocols的使用(即更高級別的協議)但沒有引用它。無論哪種方式,應用都需要決定使用什麼消息格式——自定義的、特定框架的或標準的。
總之,一個WebSocket式的應用意味著一個事件驅動型的、響應式的消息傳遞架構。此外工作在WebSocket層次對於大多數應用程序都比較底層,就像現在的大多數Web應用不直接在套接字上編程。
通向實時網路之路
我們可以從現有的各種框架里看到好多使用高等消息API,也在底層使用WebSocket,並在必要時依賴一些其他的備選項(比如HTTP流,長輪詢 等)。即便是在今天,也需要依賴於WebSocket和非WebSocket的混合技術。關鍵區別就在於框架是否提供一個單獨的API允許在必要時透明的 回退到非WebSocket傳輸。
一些框架,比如Socket.io和Vert.x提供輕量級的應用組件為事件和消息指定處理者。另一部分,像CometD,通過內部建立一個消息代理來為綁定和接收消息提供通道。像RabbitMQ或者ActiveMQ也提供了直接從瀏覽器獲取消息代理的選項。當遇到通信架構時,消息代理模式適合用於構造規模應用的。
Spring4.0的方法
Spring4.0 ——也就是當前的候選版本,預計GA於2013年12月發布——一個目標就是為Websocket類型的應用提供支持。它不僅在基於JSR-356容器之 上提供Websocket API 表現良好,而且也為那些不支持或者不允許使用Websocket的瀏覽器和網路提供了一些候選項。更重要的是,它為在網路應用中構建Websocket形 式的消息架構提供了基礎。
我們決定使用SockJS protocol作為候選項。它能為這些候選項,提供著最好和最廣泛的傳輸方式。
對於基於WebSocket模式的消息驅動的架構來說,我們也查看了許多現有的方法,我們喜歡這種真正的消息代理的處理能力,也同樣喜歡一個網頁應用使用 中心處理模塊的方式。畢竟,我們必須採用一個消息驅動的架構,但同時我們也是網路開發者,更習慣建立網頁應用,所以結果不能和我們已知的相差太大。
第一步就是選擇一個消息的格式。有許多簡單消息協議諸如STOMP,MQTT和WAMP。這些都適合應用於網頁客戶端,並為基本的消息模式提供支持。我們 選擇了STOMP,因為它的消息格式是基於HTTP模塊化的,同時它也能被廣泛的支持。然而,我們的處理模塊並沒有過分依賴於STOMP,這個處理模塊也 能被擴展成支持其他簡單協議。
使用STOMP協議能夠讓我們站在WebSocket的肩膀上。它能夠提供一種方法來解析一個消息應該傳遞給誰,我們又對接收什麼樣的消息感興趣。它允許我們像是使用廣播消息代理的插件一樣使用可用的客戶端庫文件,比如stomp.js和msg.js。這就是明顯的優勢。
Spring4提供了STOMP支持。通過兩三行的配置,你就可以在網路客戶端中把它當做一個輕量級的消息代理。它能夠不需要任何伺服器代碼就自動處理綁 定的事件,並允許控制器方法處理進來的消息和綁定事件。這與如何通過Spring MVC映射HTTP請求到控制器方法是相似的。實際上,一個spring MVC控制器能夠被擴展成基於WebSocket的接收STOMP消息的形式。
@Controller public class GreetingController { @RequestMapping(value=」/greeting」, method=POST) public void httpGreet(String text) { // ... } @MessageMapping("/greeting") public void stompGreet(String text) { // ... } }
使用一個全功能的消息代理
在一個全功能的的消息代理中做一個插件也是很容易的。舉個例子,RabbitMQ(或者其他STOMP消息代理),能夠被用來處理客戶端廣播消息的綁定。 在這個場景中,Spring仍然是處於網路客戶端連接和交換數據的網路應用層。同時,它也當做一個網關來為RabbitMQ服務,允許消息從應用流向 RabbitMQ,接著轉發給綁定此消息的客戶端。下面的流程圖就是描述這種路徑的:
這個路徑闡述了運行在多伺服器和雲環境中的大量應用實例能夠通過RabbitMQ服務廣播到達所有連接的客戶端,而不論此時客戶端連接的是哪一個應用實例。此外,也很容易從HTTP請求處理方法廣播消息到連接的客戶端或者應用的其他部分。
如需更詳盡的技術概覽說明,請移步spring.io上的M2 blog post,運行股票投資組合樣例,或者瀏覽spring開發頻道的在線論壇。我們最近發布了RC1候選版。如果你有一個應用的想法,現在是努力去實現和提供反饋絕佳時機。
如果你恰好在倫敦區域或者很容易到達這里,在11月的14、15日兩天,這里有一個spring 交流會。屆時會有spring關鍵工程師代表出席,並會發布最偉大的spring4.0框架和我們支持WebSocket的STOMP協議。
Ⅱ 請用白話講解ActiveMQ的用途
用途就是用來處理消息,也就是處理JMS的。消息隊列在大型電子商務類網站,如京東、淘寶、去哪兒等網站有著深入的應用,隊列的主要作用是消除高並發訪問高峰,加快網站的響應速度。
在不使用消息隊列的情況下,用戶的請求數據直接寫入資料庫,高發的情況下,會對資料庫造成巨大的壓力,同時也使得系統響應延遲加劇,但使用隊列後,用戶的請求發給隊列後立即返回。
例如:不能直接給用戶提示訂單提交成功,京東上提示:「您提交了訂單,請等待系統確認」再由消息隊列的消費者進程從消息隊列中獲取數據,非同步寫入資料庫。
由於消息隊列的服務處理速度遠快於資料庫,因此用戶的響應延遲可得到有效改善。
ActiveMQ主要有以下幾種使用場景
1、非同步調用。
2、一對多通信。
3、做多個系統的集成、同構、異構。
4、作為RPC的替代。
5、多個應用相互解耦。
6、作為事件驅動架構的幕後支撐。
7、為了提高系統的可伸縮性。
Ⅲ libevent 支持udp嗎
1、下載libevent源碼。
2、用VC編譯,一般編譯成靜態的。
3、在項目中配置libevent庫。
4、引用。 你可以按著以上步驟來。應該沒問題。
Ⅳ php 如何使用apache apollo .需要把手機客戶端發送過來的數據,轉存到mysql中。
insert into ..........
Ⅳ stomp 和 stamp 區別
1、讀音不同
stomp 讀音:英[stɒmp] 美[stɑ:mp]
stamp 讀音:英[stæmp] 美[stæmp]
2、意思不同
stomp的基本含義的「沉重地走」(walk heavily),(常指因生氣而)跺著腳走,邁著重重的步伐。stamp 用於比喻可表示「頓足」,指在受某種心理因素或外界刺激的情況下用腳用力地在地上踩。引申還可指「踏平」「銘記」等。stamp名詞釋義還有郵票、圖章、印的意思。
3、這兩句話可以翻譯為:
1、我使勁踩了一下他的腳,小聲說「閉嘴!」
2、他轉過身,背對他們,跺著腳上了山。
不能調換,因為一個是踩,一個是跺著腳。
(5)php安裝stomp擴展擴展閱讀:
近義詞
一、tread 讀音:英 [tred] 美 [trɛd]
vi.踩,踏;行走;交尾 vt.踩成;踏出;步行於;踩(爛)
n.踏,踩,走;交尾;(樓梯的)踏板;輪胎接觸地面的部分
第三人稱單數: treads 現在分詞: treading 過去式: trod 過去分詞: trodden
例句:
1、Oh, sorry, I didn't mean to tread on your foot...
哦,對不起,我不是故意踩你腳的。
2、Don't tread on the crops.
不要踩莊稼。
二、trample 讀音:英 [ˈtræmpl] 美 [ˈtræmpəl]
vt.踐踏;蹂躪;無視,蔑視;侵犯,傷害
vi.踐踏,重重地踩;腳步沉重地走 n.踐踏;踐踏聲
第三人稱單數: tramples 現在分詞: trampling 過去式: trampled 過去分詞: trampled
例句:
1、The campers had trampled the corn down.
野營的人踐踏了莊稼。
2、Please don't trample on the azaleas
請勿踩踏杜鵑花。
Ⅵ 消息隊列原理及選型
消息隊列(Message Queue)是一種進程間通信或同一進程的不同線程間的通信方式。
Broker(消息伺服器)
Broker的概念來自與Apache ActiveMQ,通俗的講就是MQ的伺服器。
Procer(生產者)
業務的發起方,負責生產消息傳輸給broker
Consumer(消費者)
業務的處理方,負責從broker獲取消息並進行業務邏輯處理
Topic(主題)
發布訂閱模式下的消息統一匯集地,不同生產者向topic發送消息,由MQ伺服器分發到不同的訂閱 者,實現消息的廣播
Queue(隊列)
PTP模式下,特定生產者向特定queue發送消息,消費者訂閱特定的queue完成指定消息的接收。
Message(消息體)
根據不同通信協議定義的固定格式進行編碼的數據包,來封裝業務數據,實現消息的傳輸
點對點模型用於消息生產者和消息消費者之間點到點的通信。
點對點模式包含三個角色:
每個消息都被發送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留著消息,可以放在內存 中也可以持久化,直到他們被消費或超時。
特點:
發布訂閱模型包含三個角色:
多個發布者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。
特點:
AMQP即Advanced Message Queuing Protocol,是應用層協議的一個開放標准,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。AMQP 的主要特徵是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
優點:可靠、通用
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是IBM開發的一個即時通訊協議,有可能成為物聯網的重要組成部分。該協議支持所有平台,幾乎可以把所有聯網物品和外部連接起來,被用來當做感測器和致動器(比如通過Twitter讓房屋聯網)的通信協議。
優點:格式簡潔、佔用帶寬小、移動端通信、PUSH、嵌入式系統
STOMP(Streaming Text Orientated Message Protocol)是流文本定向消息協議,是一種為MOM(Message Oriented Middleware,面向消息的中間件)設計的簡單文本協議。STOMP提供一個可互操作的連接格式,允許客戶端與任意STOMP消息代理(Broker)進行交互。
優點:命令模式(非topicqueue模式)
XMPP(可擴展消息處理現場協議,Extensible Messaging and Presence Protocol)是基於可擴展標記語言(XML)的協議,多用於即時消息(IM)以及在線現場探測。適用於伺服器之間的准即時操作。核心是基於XML流傳輸,這個協議可能最終允許網際網路用戶向網際網路上的其他任何人發送即時消息,即使其操作系統和瀏覽器不同。
優點:通用公開、兼容性強、可擴展、安全性高,但XML編碼格式佔用帶寬大
RabbitMQ 是實現 AMQP(高級消息隊列協議)的消息中間件的一種,最初起源於金融系統,用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。 RabbitMQ 主要是為了實現系統之間的雙向解耦而實現的。當生產者大量產生數據時,消費者無法快速消費,那麼需要一個中間層。保存這個數據。
RabbitMQ 是一個開源的 AMQP 實現,伺服器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
Channel(通道)
道是兩個管理器之間的一種單向點對點的的通信連接,如果需要雙向交流,可以建立一對通道。
Exchange(消息交換機)
Exchange類似於數據通信網路中的交換機,提供消息路由策略。
RabbitMq中,procer不是通過信道直接將消息發送給queue,而是先發送給Exchange。一個Exchange可以和多個Queue進行綁定,procer在傳遞消息的時候,會傳遞一個ROUTING_KEY,Exchange會根據這個ROUTING_KEY按照特定的路由演算法,將消息路由給指定的queue。和Queue一樣,Exchange也可設置為持久化,臨時或者自動刪除。
Exchange有4種類型:direct(默認),fanout, topic, 和headers。
不同類型的Exchange轉發消息的策略有所區別:
Binding(綁定)
所謂綁定就是將一個特定的 Exchange 和一個特定的 Queue 綁定起來。Exchange 和Queue的綁定可以是多對多的關系。
Routing Key(路由關鍵字)
exchange根據這個關鍵字進行消息投遞。
vhost(虛擬主機)
在RabbitMq server上可以創建多個虛擬的message broker,又叫做virtual hosts (vhosts)。每一個vhost本質上是一個mini-rabbitmq server,分別管理各自的exchange,和bindings。vhost相當於物理的server,可以為不同app提供邊界隔離,使得應用安全的運行在不同的vhost實例上,相互之間不會干擾。procer和consumer連接rabbit server需要指定一個vhost。
假設P1和C1注冊了相同的Broker,Exchange和Queue。P1發送的消息最終會被C1消費。
基本的通信流程大概如下所示:
Consumer收到消息時需要顯式的向rabbit broker發送basic。ack消息或者consumer訂閱消息時設置auto_ack參數為true。
在通信過程中,隊列對ACK的處理有以下幾種情況:
即消息的Ackownledge確認機制,為了保證消息不丟失,消息隊列提供了消息Acknowledge機制,即ACK機制,當Consumer確認消息已經被消費處理,發送一個ACK給消息隊列,此時消息隊列便可以刪除這個消息了。如果Consumer宕機/關閉,沒有發送ACK,消息隊列將認為這個消息沒有被處理,會將這個消息重新發送給其他的Consumer重新消費處理。
消息的收發處理支持事務,例如:在任務中心場景中,一次處理可能涉及多個消息的接收、處理,這應該處於同一個事務范圍內,如果一個消息處理失敗,事務回滾,消息重新回到隊列中。
消息的持久化,對於一些關鍵的核心業務來說是非常重要的,啟用消息持久化後,消息隊列宕機重啟後,消息可以從持久化存儲恢復,消息不丟失,可以繼續消費處理。
fanout 模式
模式特點:
direct 模式
任何發送到Direct Exchange的消息都會被轉發到routing_key中指定的Queue。
如果一個exchange 聲明為direct,並且bind中指定了routing_key,那麼發送消息時需要同時指明該exchange和routing_key。
簡而言之就是:生產者生成消息發送給Exchange, Exchange根據Exchange類型和basic_publish中的routing_key進行消息發送 消費者:訂閱Exchange並根據Exchange類型和binding key(bindings 中的routing key) ,如果生產者和訂閱者的routing_key相同,Exchange就會路由到那個隊列。
topic 模式
前面講到direct類型的Exchange路由規則是完全匹配binding key與routing key,但這種嚴格的匹配方式在很多情況下不能滿足實際業務需求。
topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage相似,也是將消息路由到binding key與routing key相匹配的Queue中,但這里的匹配規則有些不同。
它約定:
以上圖中的配置為例,routingKey=」quick.orange.rabbit」的消息會同時路由到Q1與Q2,routingKey=」lazy.orange.fox」的消息會路由到Q1,routingKey=」lazy.brown.fox」的消息會路由到Q2,routingKey=」lazy.pink.rabbit」的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=」quick.brown.fox」、routingKey=」orange」、routingKey=」quick.orange.male.rabbit」的消息將會被丟棄,因為它們沒有匹配任何bindingKey。
RabbitMQ,部署分三種模式:單機模式,普通集群模式,鏡像集群模式。
普通集群模式
多台機器部署,每個機器放一個rabbitmq實例,但是創建的queue只會放在一個rabbitmq實例上,每個實例同步queue的元數據。
如果消費時連的是其他實例,那個實例會從queue所在實例拉取數據。這就會導致拉取數據的開銷,如果那個放queue的實例宕機了,那麼其他實例就無法從那個實例拉取,即便開啟了消息持久化,讓rabbitmq落地存儲消息的話,消息不一定會丟,但得等這個實例恢復了,然後才可以繼續從這個queue拉取數據, 這就沒什麼高可用可言,主要是提供吞吐量 ,讓集群中多個節點來服務某個queue的讀寫操作。
鏡像集群模式
queue的元數據和消息都會存放在多個實例,每次寫消息就自動同步到多個queue實例里。這樣任何一個機器宕機,其他機器都可以頂上,但是性能開銷太大,消息同步導致網路帶寬壓力和消耗很重,另外,沒有擴展性可言,如果queue負載很重,加機器,新增的機器也包含了這個queue的所有數據,並沒有辦法線性擴展你的queue。此時,需要開啟鏡像集群模式,在rabbitmq管理控制台新增一個策略,將數據同步到指定數量的節點,然後你再次創建queue的時候,應用這個策略,就會自動將數據同步到其他的節點上去了。
Kafka 是 Apache 的子項目,是一個高性能跨語言的分布式發布/訂閱消息隊列系統(沒有嚴格實現 JMS 規范的點對點模型,但可以實現其效果),在企業開發中有廣泛的應用。高性能是其最大優勢,劣勢是消息的可靠性(丟失或重復),這個劣勢是為了換取高性能,開發者可以以稍降低性能,來換取消息的可靠性。
一個Topic可以認為是一類消息,每個topic將被分成多個partition(區),每個partition在存儲層面是append log文件。任何發布到此partition的消息都會被直接追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為一個long型數字,它是唯一標記一條消息。它唯一的標記一條消息。kafka並沒有提供其他額外的索引機制來存儲offset,因為在kafka中幾乎不允許對消息進行「隨機讀寫」。
Kafka和JMS(Java Message Service)實現(activeMQ)不同的是:即使消息被消費,消息仍然不會被立即刪除。日誌文件將會根據broker中的配置要求,保留一定的時間之後刪除;比如log文件保留2天,那麼兩天後,文件會被清除,無論其中的消息是否被消費。kafka通過這種簡單的手段,來釋放磁碟空間,以及減少消息消費之後對文件內容改動的磁碟IO開支。
對於consumer而言,它需要保存消費消息的offset,對於offset的保存和使用,有consumer來控制;當consumer正常消費消息時,offset將會"線性"的向前驅動,即消息將依次順序被消費。事實上consumer可以使用任意順序消費消息,它只需要將offset重置為任意值。(offset將會保存在zookeeper中,參見下文)
kafka集群幾乎不需要維護任何consumer和procer狀態信息,這些信息有zookeeper保存;因此procer和consumer的客戶端實現非常輕量級,它們可以隨意離開,而不會對集群造成額外的影響。
partitions的設計目的有多個。最根本原因是kafka基於文件存儲。通過分區,可以將日誌內容分散到多個server上,來避免文件尺寸達到單機磁碟的上限,每個partiton都會被當前server(kafka實例)保存;可以將一個topic切分多任意多個partitions,來消息保存/消費的效率。此外越多的partitions意味著可以容納更多的consumer,有效提升並發消費的能力。(具體原理參見下文)。
一個Topic的多個partitions,被分布在kafka集群中的多個server上;每個server(kafka實例)負責partitions中消息的讀寫操作;此外kafka還可以配置partitions需要備份的個數(replicas),每個partition將會被備份到多台機器上,以提高可用性。
基於replicated方案,那麼就意味著需要對多個備份進行調度;每個partition都有一個server為"leader";leader負責所有的讀寫操作,如果leader失效,那麼將會有其他follower來接管(成為新的leader);follower只是單調的和leader跟進,同步消息即可。由此可見作為leader的server承載了全部的請求壓力,因此從集群的整體考慮,有多少個partitions就意味著有多少個"leader",kafka會將"leader"均衡的分散在每個實例上,來確保整體的性能穩定。
Procers
Procer將消息發布到指定的Topic中,同時Procer也能決定將此消息歸屬於哪個partition;比如基於"round-robin"方式或者通過其他的一些演算法等。
Consumers
本質上kafka只支持Topic。每個consumer屬於一個consumer group;反過來說,每個group中可以有多個consumer。發送到Topic的消息,只會被訂閱此Topic的每個group中的一個consumer消費。
如果所有的consumer都具有相同的group,這種情況和queue模式很像;消息將會在consumers之間負載均衡。
如果所有的consumer都具有不同的group,那這就是"發布-訂閱";消息將會廣播給所有的消費者。
在kafka中,一個partition中的消息只會被group中的一個consumer消費;每個group中consumer消息消費互相獨立;我們可以認為一個group是一個"訂閱"者,一個Topic中的每個partions,只會被一個"訂閱者"中的一個consumer消費,不過一個consumer可以消費多個partitions中的消息。kafka只能保證一個partition中的消息被某個consumer消費時,消息是順序的。事實上,從Topic角度來說,消息仍不是有序的。
Kafka的設計原理決定,對於一個topic,同一個group中不能有多於partitions個數的consumer同時消費,否則將意味著某些consumer將無法得到消息。
Guarantees
Kafka就比較適合高吞吐量並且允許少量數據丟失的場景,如果非要保證「消息可靠傳輸」,可以使用JMS。
Kafka Procer 消息發送有兩種方式(配置參數 procer.type):
對於同步方式(procer.type=sync)?Kafka Procer 消息發送有三種確認方式(配置參數 acks):
kafka的設計初衷是希望作為一個統一的信息收集平台,能夠實時的收集反饋信息,並需要能夠支撐較大的數據量,且具備良好的容錯能力。
持久性
kafka使用文件存儲消息,這就直接決定kafka在性能上嚴重依賴文件系統的本身特性。且無論任何OS下,對文件系統本身的優化幾乎沒有可能。文件緩存/直接內存映射等是常用的手段。因為kafka是對日誌文件進行append操作,因此磁碟檢索的開支是較小的;同時為了減少磁碟寫入的次數,broker會將消息暫時buffer起來,當消息的個數(或尺寸)達到一定閥值時,再flush到磁碟,這樣減少了磁碟IO調用的次數。
性能
需要考慮的影響性能點很多,除磁碟IO之外,我們還需要考慮網路IO,這直接關繫到kafka的吞吐量問題。kafka並沒有提供太多高超的技巧;對於procer端,可以將消息buffer起來,當消息的條數達到一定閥值時,批量發送給broker;對於consumer端也是一樣,批量fetch多條消息。不過消息量的大小可以通過配置文件來指定。對於kafka broker端,似乎有個sendfile系統調用可以潛在的提升網路IO的性能:將文件的數據映射到系統內存中,socket直接讀取相應的內存區域即可,而無需進程再次和交換。 其實對於procer/consumer/broker三者而言,CPU的開支應該都不大,因此啟用消息壓縮機制是一個良好的策略;壓縮需要消耗少量的CPU資源,不過對於kafka而言,網路IO更應該需要考慮。可以將任何在網路上傳輸的消息都經過壓縮。kafka支持gzip/snappy等多種壓縮方式。
生產者
負載均衡: procer將會和Topic下所有partition leader保持socket連接;消息由procer直接通過socket發送到broker,中間不會經過任何「路由層「。事實上,消息被路由到哪個partition上,有procer客戶端決定。比如可以採用「random「「key-hash「「輪詢「等,如果一個topic中有多個partitions,那麼在procer端實現「消息均衡分發「是必要的。
其中partition leader的位置(host:port)注冊在zookeeper中,procer作為zookeeper client,已經注冊了watch用來監聽partition leader的變更事件。
非同步發送:將多條消息暫且在客戶端buffer起來,並將他們批量的發送到broker,小數據IO太多,會拖慢整體的網路延遲,批量延遲發送事實上提升了網路效率。不過這也有一定的隱患,比如說當procer失效時,那些尚未發送的消息將會丟失。
消費者
consumer端向broker發送「fetch」請求,並告知其獲取消息的offset;此後consumer將會獲得一定條數的消息;consumer端也可以重置offset來重新消費消息。
在JMS實現中,Topic模型基於push方式,即broker將消息推送給consumer端。不過在kafka中,採用了pull方式,即consumer在和broker建立連接之後,主動去pull(或者說fetch)消息;這中模式有些優點,首先consumer端可以根據自己的消費能力適時的去fetch消息並處理,且可以控制消息消費的進度(offset);此外,消費者可以良好的控制消息消費的數量,batch fetch。
其他JMS實現,消息消費的位置是有prodiver保留,以便避免重復發送消息或者將沒有消費成功的消息重發等,同時還要控制消息的狀態。這就要求JMS broker需要太多額外的工作。在kafka中,partition中的消息只有一個consumer在消費,且不存在消息狀態的控制,也沒有復雜的消息確認機制,可見kafka broker端是相當輕量級的。當消息被consumer接收之後,consumer可以在本地保存最後消息的offset,並間歇性的向zookeeper注冊offset。由此可見,consumer客戶端也很輕量級。
對於JMS實現,消息傳輸擔保非常直接:有且只有一次(exactly once)。
在kafka中稍有不同:
at most once: 消費者fetch消息,然後保存offset,然後處理消息;當client保存offset之後,但是在消息處理過程中出現了異常,導致部分消息未能繼續處理。那麼此後"未處理"的消息將不能被fetch到,這就是"at most once"。
at least once: 消費者fetch消息,然後處理消息,然後保存offset。如果消息處理成功之後,但是在保存offset階段zookeeper異常導致保存操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的消息,這就是"at least once",原因offset沒有及時的提交給zookeeper,zookeeper恢復正常還是之前offset狀態。
exactly once: kafka中並沒有嚴格的去實現(基於2階段提交,事務),我們認為這種策略在kafka中是沒有必要的。
通常情況下「at-least-once」是我們首選。(相比at most once而言,重復接收數據總比丟失數據要好)。
kafka高可用由多個broker組成,每個broker是一個節點;
創建一個topic,這個topic會劃分為多個partition,每個partition存在於不同的broker上,每個partition就放一部分數據。
kafka是一個分布式消息隊列,就是說一個topic的數據,是分散放在不同的機器上,每個機器就放一部分數據。
在0.8版本以前,是沒有HA機制的,就是任何一個broker宕機了,那個broker上的partition就廢了,沒法寫也沒法讀,沒有什麼高可用性可言。
0.8版本以後,才提供了HA機制,也就是就是replica副本機制。每個partition的數據都會同步到其他的機器上,形成自己的多個replica副本。然後所有replica會選舉一個leader出來,那麼生產和消費都跟這個leader打交道,然後其他replica就是follower。
寫的時候,leader會負責把數據同步到所有follower上去,讀的時候就直接讀leader上數據即可。
kafka會均勻的將一個partition的所有replica分布在不同的機器上,從而提高容錯性。
如果某個broker宕機了也沒事,它上面的partition在其他機器上都有副本的,如果這上面有某個partition的leader,那麼此時會重新選舉一個新的leader出來,大家繼續讀寫那個新的leader即可。這就有所謂的高可用性了。
寫數據的時候,生產者就寫leader,然後leader將數據落地寫本地磁碟,接著其他follower自己主動從leader來pull數據。一旦所有follower同步好數據了,就會發送ack給leader,leader收到所有follower的ack之後,就會返回寫成功的消息給生產者。
消息丟失會出現在三個環節,分別是生產者、mq中間件、消費者:
RabbitMQ
Kafka
大體和RabbitMQ相同。
Rabbitmq
需要保證順序的消息投遞到同一個queue中,這個queue只能有一個consumer,如果需要提升性能,可以用內存隊列做排隊,然後分發給底層不同的worker來處理。
Kafka
寫入一個partition中的數據一定是有序的。生產者在寫的時候 ,可以指定一個key,比如指定訂單id作為key,這個訂單相關數據一定會被分發到一個partition中去。消費者從partition中取出數據的時候也一定是有序的,把每個數據放入對應的一個內存隊列,一個partition中有幾條相關數據就用幾個內存隊列,消費者開啟多個線程,每個線程處理一個內存隊列。
Ⅶ windows 5.5.1 版本的PHP 不支持 stomp擴展嗎
嘗試如下操作: 1、 在php.ini中設置extension_dir 指向e:\php5.4\ext; 部分php擴展載入了 2、設置windows系統環境變數, phpext, 指向e:\php5.4\ext, PHPRC 指向e:\php5.4 設置path環境變數,添加e:\php5.4 3、重新啟動apache。
Ⅷ activemq 怎麼實現負載均衡
一、架構和技術介紹
1、簡介
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息匯流排。完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現
2、activemq的特性
1. 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2. 完全支持JMS1.1和J2EE 1.4規范 (持久化,XA消息,事務)
3. 對Spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統裡面去,而且也支持Spring2.0的特性
Ⅸ 大型的PHP應用,通常使用什麼應用做消息隊列
一、消息隊列概述
消息隊列中間件是分布式系統中重要的組件,主要解決應用耦合,非同步消息,流量削鋒等問題。實現高性能,高可用,可伸縮和最終一致性架構。是大型分布式系統不可缺少的中間件。
目前在生產環境,使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。
二、消息隊列應用場景
以下介紹消息隊列在實際應用中常用的使用場景。非同步處理,應用解耦,流量削鋒和消息通訊四個場景。
2.1非同步處理
場景說明:用戶注冊後,需要發注冊郵件和注冊簡訊。傳統的做法有兩種1.串列的方式;2.並行方式。
(1)串列方式:將注冊信息寫入資料庫成功後,發送注冊郵件,再發送注冊簡訊。以上三個任務全部完成後,返回給客戶端。(架構KKQ:466097527,歡迎加入)
(2)並行方式:將注冊信息寫入資料庫成功後,發送注冊郵件的同時,發送注冊簡訊。以上三個任務完成後,返回給客戶端。與串列的差別是,並行的方式可以提高處理的時間。
假設三個業務節點每個使用50毫秒鍾,不考慮網路等其他開銷,則串列方式的時間是150毫秒,並行的時間可能是100毫秒。
因為CPU在單位時間內處理的請求數是一定的,假設CPU1秒內吞吐量是100次。則串列方式1秒內CPU可處理的請求量是7次(1000/150)。並行方式處理的請求量是10次(1000/100)。
小結:如以上案例描述,傳統的方式系統的性能(並發量,吞吐量,響應時間)會有瓶頸。如何解決這個問題呢?
引入消息隊列,將不是必須的業務邏輯,非同步處理。改造後的架構如下:
按照以上約定,用戶的響應時間相當於是注冊信息寫入資料庫的時間,也就是50毫秒。注冊郵件,發送簡訊寫入消息隊列後,直接返回,因此寫入消息隊列的速度很快,基本可以忽略,因此用戶的響應時間可能是50毫秒。因此架構改變後,系統的吞吐量提高到每秒20 QPS。比串列提高了3倍,比並行提高了兩倍。
2.2應用解耦
場景說明:用戶下單後,訂單系統需要通知庫存系統。傳統的做法是,訂單系統調用庫存系統的介面。如下圖:
傳統模式的缺點:
1) 假如庫存系統無法訪問,則訂單減庫存將失敗,從而導致訂單失敗;
2) 訂單系統與庫存系統耦合;
如何解決以上問題呢?引入應用消息隊列後的方案,如下圖:
訂單系統:用戶下單後,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。
庫存系統:訂閱下單的消息,採用拉/推的方式,獲取下單信息,庫存系統根據下單信息,進行庫存操作。
假如:在下單時庫存系統不能正常使用。也不影響正常下單,因為下單後,訂單系統寫入消息隊列就不再關心其他的後續操作了。實現訂單系統與庫存系統的應用解耦。
2.3流量削鋒
流量削鋒也是消息隊列中的常用場景,一般在秒殺或團搶活動中使用廣泛。
應用場景:秒殺活動,一般會因為流量過大,導致流量暴增,應用掛掉。為解決這個問題,一般需要在應用前端加入消息隊列。
可以控制活動的人數;
可以緩解短時間內高流量壓垮應用;
用戶的請求,伺服器接收後,首先寫入消息隊列。假如消息隊列長度超過最大數量,則直接拋棄用戶請求或跳轉到錯誤頁面;
秒殺業務根據消息隊列中的請求信息,再做後續處理。
2.4日誌處理
日誌處理是指將消息隊列用在日誌處理中,比如Kafka的應用,解決大量日誌傳輸的問題。架構簡化如下:
日誌採集客戶端,負責日誌數據採集,定時寫受寫入Kafka隊列;
Kafka消息隊列,負責日誌數據的接收,存儲和轉發;
日誌處理應用:訂閱並消費kafka隊列中的日誌數據;
以下是新浪kafka日誌處理應用案例:
(1)Kafka:接收用戶日誌的消息隊列。
(2)Logstash:做日誌解析,統一成JSON輸出給Elasticsearch。
(3)Elasticsearch:實時日誌分析服務的核心技術,一個schemaless,實時的數據存儲服務,通過index組織數據,兼具強大的搜索和統計功能。
(4)Kibana:基於Elasticsearch的數據可視化組件,超強的數據可視化能力是眾多公司選擇ELK stack的重要原因。
2.5消息通訊
消息通訊是指,消息隊列一般都內置了高效的通信機制,因此也可以用在純的消息通訊。比如實現點對點消息隊列,或者聊天室等。
點對點通訊:
客戶端A和客戶端B使用同一隊列,進行消息通訊。
聊天室通訊:
客戶端A,客戶端B,客戶端N訂閱同一主題,進行消息發布和接收。實現類似聊天室效果。
以上實際是消息隊列的兩種消息模式,點對點或發布訂閱模式。模型為示意圖,供參考。
三、消息中間件示例
3.1電商系統
消息隊列採用高可用,可持久化的消息中間件。比如Active MQ,Rabbit MQ,Rocket Mq。(1)應用將主幹邏輯處理完成後,寫入消息隊列。消息發送是否成功可以開啟消息的確認模式。(消息隊列返回消息接收成功狀態後,應用再返回,這樣保障消息的完整性)
(2)擴展流程(發簡訊,配送處理)訂閱隊列消息。採用推或拉的方式獲取消息並處理。
(3)消息將應用解耦的同時,帶來了數據一致性問題,可以採用最終一致性方式解決。比如主數據寫入資料庫,擴展應用根據消息隊列,並結合資料庫方式實現基於消息隊列的後續處理。
3.2日誌收集系統
分為Zookeeper注冊中心,日誌收集客戶端,Kafka集群和Storm集群(OtherApp)四部分組成。
Zookeeper注冊中心,提出負載均衡和地址查找服務;
日誌收集客戶端,用於採集應用系統的日誌,並將數據推送到kafka隊列;
四、JMS消息服務
講消息隊列就不得不提JMS 。JMS(Java Message Service,Java消息服務)API是一個消息服務的標准/規范,允許應用程序組件基於JavaEE平台創建、發送、接收和讀取消息。它使分布式通信耦合度更低,消息服務更加可靠以及非同步性。
在EJB架構中,有消息bean可以無縫的與JM消息服務集成。在J2EE架構模式中,有消息服務者模式,用於實現消息與應用直接的解耦。
4.1消息模型
在JMS標准中,有兩種消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。
4.1.1 P2P模式
P2P模式包含三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。每個消息都被發送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留著消息,直到他們被消費或超時。
P2P的特點
每個消息只有一個消費者(Consumer)(即一旦被消費,消息就不再在消息隊列中)
發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息之後,不管接收者有沒有正在運行,它不會影響到消息被發送到隊列
接收者在成功接收消息之後需向隊列應答成功
如果希望發送的每個消息都會被成功處理的話,那麼需要P2P模式。(架構KKQ:466097527,歡迎加入)
4.1.2 Pub/sub模式
包含三個角色主題(Topic),發布者(Publisher),訂閱者(Subscriber) 。多個發布者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。
Pub/Sub的特點
每個消息可以有多個消費者
發布者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須創建一個訂閱者之後,才能消費發布者的消息。
為了消費消息,訂閱者必須保持運行的狀態。
為了緩和這樣嚴格的時間相關性,JMS允許訂閱者創建一個可持久化的訂閱。這樣,即使訂閱者沒有被激活(運行),它也能接收到發布者的消息。
如果希望發送的消息可以不被做任何處理、或者只被一個消息者處理、或者可以被多個消費者處理的話,那麼可以採用Pub/Sub模型。
4.2消息消費
在JMS中,消息的產生和消費都是非同步的。對於消費來說,JMS的消息者可以通過兩種方式來消費消息。
(1)同步
訂閱者或接收者通過receive方法來接收消息,receive方法在接收到消息之前(或超時之前)將一直阻塞;
(2)非同步
訂閱者或接收者可以注冊為一個消息監聽器。當消息到達之後,系統自動調用監聽器的onMessage方法。
JNDI:Java命名和目錄介面,是一種標準的Java命名系統介面。可以在網路上查找和訪問服務。通過指定一個資源名稱,該名稱對應於資料庫或命名服務中的一個記錄,同時返回資源連接建立所必須的信息。
JNDI在JMS中起到查找和訪問發送目標或消息來源的作用。(架構KKQ:466097527,歡迎加入)
4.3JMS編程模型
(1) ConnectionFactory
創建Connection對象的工廠,針對兩種不同的jms消息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種。可以通過JNDI來查找ConnectionFactory對象。
(2) Destination
Destination的意思是消息生產者的消息發送目標或者說消息消費者的消息來源。對於消息生產者來說,它的Destination是某個隊列(Queue)或某個主題(Topic);對於消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)。
所以,Destination實際上就是兩種類型的對象:Queue、Topic可以通過JNDI來查找Destination。
(3) Connection
Connection表示在客戶端和JMS系統之間建立的鏈接(對TCP/IP socket的包裝)。Connection可以產生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種類型:QueueConnection和TopicConnection。
(4) Session
Session是操作消息的介面。可以通過session創建生產者、消費者、消息等。Session提供了事務的功能。當需要使用session發送/接收多個消息時,可以將這些發送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。
(5) 消息的生產者
消息生產者由Session創建,並用於將消息發送到Destination。同樣,消息生產者分兩種類型:QueueSender和TopicPublisher。可以調用消息生產者的方法(send或publish方法)發送消息。
(6) 消息消費者
消息消費者由Session創建,用於接收被發送到Destination的消息。兩種類型:QueueReceiver和TopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來創建。當然,也可以session的creatDurableSubscriber方法來創建持久化的訂閱者。
(7) MessageListener
消息監聽器。如果注冊了消息監聽器,一旦消息到達,將自動調用監聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener。
深入學習JMS對掌握JAVA架構,EJB架構有很好的幫助,消息中間件也是大型分布式系統必須的組件。本次分享主要做全局性介紹,具體的深入需要大家學習,實踐,總結,領會。
五、常用消息隊列
一般商用的容器,比如WebLogic,JBoss,都支持JMS標准,開發上很方便。但免費的比如Tomcat,Jetty等則需要使用第三方的消息中間件。本部分內容介紹常用的消息中間件(Active MQ,Rabbit MQ,Zero MQ,Kafka)以及他們的特點。
5.1 ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息匯流排。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現,盡管JMS規范出台已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。
ActiveMQ特性如下:
⒈ 多種語言和協議編寫客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
⒉ 完全支持JMS1.1和J2EE 1.4規范 (持久化,XA消息,事務)
⒊ 對spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統裡面去,而且也支持Spring2.0的特性
⒋ 通過了常見J2EE伺服器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業伺服器上
⒌ 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
⒍ 支持通過JDBC和journal提供高速的消息持久化
⒎ 從設計上保證了高性能的集群,客戶端-伺服器,點對點
⒏ 支持Ajax
⒐ 支持與Axis的整合
⒑ 可以很容易得調用內嵌JMS provider,進行測試
5.2 RabbitMQ
RabbitMQ是流行的開源消息隊列系統,用erlang語言開發。RabbitMQ是AMQP(高級消息隊列協議)的標准實現。支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
幾個重要概念:
Broker:簡單來說就是消息隊列伺服器實體。
Exchange:消息交換機,它指定消息按什麼規則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的許可權分離。
procer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。
消息隊列的使用過程,如下:
(1)客戶端連接到消息隊列伺服器,打開一個channel。
(2)客戶端聲明一個exchange,並設置相關屬性。
(3)客戶端聲明一個queue,並設置相關屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關系。
(5)客戶端投遞消息到exchange。
exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列里。
5.3 ZeroMQ
號稱史上最快的消息隊列,它實際類似於Socket的一系列介面,他跟Socket的區別是:普通的socket是端到端的(1:1的關系),而ZMQ卻是可以N:M 的關系,人們對BSD套接字的了解較多的是點對點的連接,點對點連接需要顯式地建立連接、銷毀連接、選擇協議(TCP/UDP)和處理錯誤等,而ZMQ屏蔽了這些細節,讓你的網路編程更為簡單。ZMQ用於node與node間的通信,node可以是主機或者是進程。
引用官方的說法: 「ZMQ(以下ZeroMQ簡稱ZMQ)是一個簡單好用的傳輸層,像框架一樣的一個socket library,他使得Socket編程更加簡單、簡潔和性能更高。是一個消息處理隊列庫,可在多個線程、內核和主機盒之間彈性伸縮。ZMQ的明確目標是「成為標准網路協議棧的一部分,之後進入Linux內核」。現在還未看到它們的成功。但是,它無疑是極具前景的、並且是人們更加需要的「傳統」BSD套接字之上的一 層封裝。ZMQ讓編寫高性能網路應用程序極為簡單和有趣。」
特點是:
高性能,非持久化;
跨平台:支持Linux、Windows、OS X等。
多語言支持; C、C++、Java、.NET、Python等30多種開發語言。
可單獨部署或集成到應用中使用;
可作為Socket通信庫使用。
與RabbitMQ相比,ZMQ並不像是一個傳統意義上的消息隊列伺服器,事實上,它也根本不是一個伺服器,更像一個底層的網路通訊庫,在Socket API之上做了一層封裝,將網路通訊、進程通訊和線程通訊抽象為統一的API介面。支持「Request-Reply 「,」Publisher-Subscriber「,」Parallel Pipeline」三種基本模型和擴展模型。
ZeroMQ高性能設計要點:
1、無鎖的隊列模型
對於跨線程間的交互(用戶端和session)之間的數據交換通道pipe,採用無鎖的隊列演算法CAS;在pipe兩端注冊有非同步事件,在讀或者寫消息到pipe的時,會自動觸發讀寫事件。
2、批量處理的演算法
對於傳統的消息處理,每個消息在發送和接收的時候,都需要系統的調用,這樣對於大量的消息,系統的開銷比較大,zeroMQ對於批量的消息,進行了適應性的優化,可以批量的接收和發送消息。
3、多核下的線程綁定,無須CPU切換
區別於傳統的多線程並發模式,信號量或者臨界區, zeroMQ充分利用多核的優勢,每個核綁定運行一個工作者線程,避免多線程之間的CPU切換開銷。
5.4 Kafka
Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網路上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。 對於像Hadoop的一樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行載入機制來統一線上和離線的消息處理,也是為了通過集群機來提供實時的消費。
Kafka是一種高吞吐量的分布式發布訂閱消息系統,有如下特性:
通過O(1)的磁碟數據結構提供消息的持久化,這種結構對於即使數以TB的消息存儲也能夠保持長時間的穩定性能。(文件追加的方式寫入數據,過期的數據定期刪除)
高吞吐量:即使是非常普通的硬體Kafka也可以支持每秒數百萬的消息。
支持通過Kafka伺服器和消費機集群來分區消息。
支持Hadoop並行數據載入。
Kafka相關概念
Broker
Kafka集群包含一個或多個伺服器,這種伺服器被稱為broker[5]
Topic
每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存於何處)
Partition
Parition是物理上的概念,每個Topic包含一個或多個Partition.
Procer
負責發布消息到Kafka broker
Consumer
消息消費者,向Kafka broker讀取消息的客戶端。
Consumer Group
每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於默認的group)。
一般應用在大數據日誌處理或對實時性(少量延遲),可靠性(少量丟數據)要求稍低的場景使用。
Ⅹ Activemq的stomp怎麼能一條一條地接收消息,而不是全部接收
ActiveMQ另問題要軟體能掛掉掛掉怕怕掛掉信息給丟所本節析幾種持久化式:
、持久化文件
ActiveMQ默認支持種式要發消息設置消息持久化
打安裝目錄配置文件:
D:\ActiveMQ\apache-activemq\conf\activemq.xml越80行發現默認配置項:
注意使用kahaDB基於文件支持事務消息存儲器靠高性能擴展消息存儲器
設計初衷使用簡單並盡能快KahaDB索引使用transaction log並且所destination使用index測試表明:用於產環境支持1萬active connection每connection獨立queue該表現已經足矣應付部需求
再發送消息候改變第二參數:
MsgDeliveryMode.Persistent
Message保存式2種
PERSISTENT:保存磁碟consumer消費message刪除
NON_PERSISTENT:保存內存消費message清除
注意:堆積消息太能導致內存溢
打產者端發送消息:
wps30F4.tmp
啟消費者端同管理界面查看:
wps3105.tmp
發現消息等待沒持久化ActiveMQ宕機重啟消息丟失我現修改文件持久化重啟ActiveMQ消費者仍能夠收消息
wps3106.tmp
二、持久化資料庫
我支持Mysql例先載mysql-connector-java-5.1.34-bin.jar包放:
D:\ActiveMQ\apache-activemq\lib目錄
打並修改配置文件:
復制代碼
<beans
xmlns=""
xmlns:xsi=""
xsi:schemaLocation=" /spring-beans.xsd
/activemq-core.xsd">
file:${activemq.conf}/credentials.properties
<bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
<!--
The element is used to configure the ActiveMQ broker.
-->
<!-- The is used to prevent
slow topic consumers to block procers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
-->
<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
-->
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
-->
<!--
The systemUsage controls the maximum amount of space the broker will
use before disabling caching and/or slowing down procers. For more information, see:
-->
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
-->
<!--
Enable web consoles, REST and Ajax APIs and demos
The web consoles requires by default login, you can disable this in the jetty.xml file
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
-