導航:首頁 > 編程語言 > rabbitmqphp示例

rabbitmqphp示例

發布時間:2024-12-16 21:58:45

㈠ 如何用php獲取rabbitmq指定隊列中的未處理消息數量

lare一個隊列,置AMQP_PASSIVE標志位,就不會影響服務端狀態,並返回消息計數。
$conn = new AMQPConnection();
//...
$queue = new AMQPQueue($conn);
$queue->setFlags(AMQP_PASSIVE);
$messageCount = $queue->declare($queueName); // <- 這里
//...

㈡ 學習thinkphp6.0使用rabbitmq示例


以下是如何在ThinkPHP 6.0中使用RabbitMQ進行示例,並通過supervisor實現消費者守護的簡要步驟:


1. 安裝與配置



2. 消息處理



3. 交換機與路由


RabbitMQ的關鍵概念包括:Broker(消息隊列服務)、Exchange(消息路由)、Queue(消息隊列)、Binding(綁定關系)和Routingkey(路由鍵)。消息流程包括生產者通過Exchange將消息路由到Queue,Queue再將消息分發給消費端。


交換機類型

㈢ 延時 (遲) 操作的 PHP 簡單實現

在業務中,遇到延遲操作需求,比如下單半小時未支付取消訂單,或下單十五分鍾後未支付發簡訊提醒等場景。本文將介紹幾種實現方式。

實現方式分為幾種,首先,執行處理腳本在下單後定時運行,這種方式簡潔但不優雅,且在處理大量訂單時可能遇到性能瓶頸。

第二種方式是利用消費邏輯,在消費者中處理延遲操作,通過啟動生產消息與消費者等待,這種方式利用現有服務,減少開發時間。

第三種方式引入消息隊列如RabbitMQ。消費者啟動,通過生產者啟動消息,消息先入隊列,過期後進入死信隊列被消費,流轉過程清晰可見。

第四種方式,參考原文「延時(遲)操作的PHP簡單實現」,提供進階PHP學習資源,包括分布式架構、高並發、高性能等知識點,助力PHP開發者提升技能。

希望以上內容能幫助大家解決業務中遇到的延遲操作問題,推動PHP開發者在進階道路上成長。

㈣ 大型的 PHP應用 通常使用什麼應用做 消息隊列 的

一、消息隊列概述
消息隊列中間件是分布式系統中重要的組件,主要解決應用耦合,非同步消息,流量削鋒等問題。實現高性能,高可用,可伸縮和最終一致性架構。是大型分布式系統不可缺少的中間件。
目前在生產環境,使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。
二、消息隊列應用場景
以下介紹消息隊列在實際應用中常用的使用場景。非同步處理,應用解耦,流量削鋒和消息通訊四個場景。
2.1非同步處理
場景說明:用戶注冊後,需要發注冊郵件和注冊簡訊。傳統的做法有兩種1.串列的方式;2.並行方式。
(1)串列方式:將注冊信息寫入資料庫成功後,發送注冊郵件,再發送注冊簡訊。以上三個任務全部完成後,返回給客戶端。(架構KKQ:466097527,歡迎加入)
(2)並行方式:將注冊信息寫入資料庫成功後,發送注冊郵件的同時,發送注冊簡訊。以上三個任務完成後,返回給客戶端。與串列的差別是,並行的方式可以提高處理的時間。
假設三個業務節點每個使用50毫秒鍾,不考慮網路等其他開銷,則串列方式的時間是150毫秒,並行的時間可能是100毫秒。
因為CPU在單位時間內處理的請求數是一定的,假設CPU1秒內吞吐量是100次。則串列方式1秒內CPU可處理的請求量是7次(1000/150)。並行方式處理的請求量是10次(1000/100)。
小結:如以上案例描述,傳統的方式系統的性能(並發量,吞吐量,響應時間)會有瓶頸。如何解決這個問題呢?
引入消息隊列,將不是必須的業務邏輯,非同步處理。改造後的架構如下:
按照以上約定,用戶的響應時間相當於是注冊信息寫入資料庫的時間,也就是50毫秒。注冊郵件,發送簡訊寫入消息隊列後,直接返回,因此寫入消息隊列的速度很快,基本可以忽略,因此用戶的響應時間可能是50毫秒。因此架構改變後,系統的吞吐量提高到每秒20 QPS。比串列提高了3倍,比並行提高了兩倍。
2.2應用解耦
場景說明:用戶下單後,訂單系統需要通知庫存系統。傳統的做法是,訂單系統調用庫存系統的介面。如下圖:
傳統模式的缺點:
1) 假如庫存系統無法訪問,則訂單減庫存將失敗,從而導致訂單失敗;
2) 訂單系統與庫存系統耦合;
如何解決以上問題呢?引入應用消息隊列後的方案,如下圖:
訂單系統:用戶下單後,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。
庫存系統:訂閱下單的消息,採用拉/推的方式,獲取下單信息,庫存系統根據下單信息,進行庫存操作。
假如:在下單時庫存系統不能正常使用。也不影響正常下單,因為下單後,訂單系統寫入消息隊列就不再關心其他的後續操作了。實現訂單系統與庫存系統的應用解耦。
2.3流量削鋒
流量削鋒也是消息隊列中的常用場景,一般在秒殺或團搶活動中使用廣泛。
應用場景:秒殺活動,一般會因為流量過大,導致流量暴增,應用掛掉。為解決這個問題,一般需要在應用前端加入消息隊列。
可以控制活動的人數;
可以緩解短時間內高流量壓垮應用;
用戶的請求,伺服器接收後,首先寫入消息隊列。假如消息隊列長度超過最大數量,則直接拋棄用戶請求或跳轉到錯誤頁面;
秒殺業務根據消息隊列中的請求信息,再做後續處理。
2.4日誌處理
日誌處理是指將消息隊列用在日誌處理中,比如Kafka的應用,解決大量日誌傳輸的問題。架構簡化如下:
日誌採集客戶端,負責日誌數據採集,定時寫受寫入Kafka隊列;
Kafka消息隊列,負責日誌數據的接收,存儲和轉發;
日誌處理應用:訂閱並消費kafka隊列中的日誌數據;
以下是新浪kafka日誌處理應用案例:
(1)Kafka:接收用戶日誌的消息隊列。
(2)Logstash:做日誌解析,統一成JSON輸出給Elasticsearch。
(3)Elasticsearch:實時日誌分析服務的核心技術,一個schemaless,實時的數據存儲服務,通過index組織數據,兼具強大的搜索和統計功能。
(4)Kibana:基於Elasticsearch的數據可視化組件,超強的數據可視化能力是眾多公司選擇ELK stack的重要原因。
2.5消息通訊
消息通訊是指,消息隊列一般都內置了高效的通信機制,因此也可以用在純的消息通訊。比如實現點對點消息隊列,或者聊天室等。
點對點通訊:
客戶端A和客戶端B使用同一隊列,進行消息通訊。
聊天室通訊:
客戶端A,客戶端B,客戶端N訂閱同一主題,進行消息發布和接收。實現類似聊天室效果。
以上實際是消息隊列的兩種消息模式,點對點或發布訂閱模式。模型為示意圖,供參考。
三、消息中間件示例
3.1電商系統
消息隊列採用高可用,可持久化的消息中間件。比如Active MQ,Rabbit MQ,Rocket Mq。(1)應用將主幹邏輯處理完成後,寫入消息隊列。消息發送是否成功可以開啟消息的確認模式。(消息隊列返回消息接收成功狀態後,應用再返回,這樣保障消息的完整性)
(2)擴展流程(發簡訊,配送處理)訂閱隊列消息。採用推或拉的方式獲取消息並處理。
(3)消息將應用解耦的同時,帶來了數據一致性問題,可以採用最終一致性方式解決。比如主數據寫入資料庫,擴展應用根據消息隊列,並結合資料庫方式實現基於消息隊列的後續處理。
3.2日誌收集系統
分為Zookeeper注冊中心,日誌收集客戶端,Kafka集群和Storm集群(OtherApp)四部分組成。
Zookeeper注冊中心,提出負載均衡和地址查找服務;
日誌收集客戶端,用於採集應用系統的日誌,並將數據推送到kafka隊列;
四、JMS消息服務
講消息隊列就不得不提JMS 。JMS(Java Message Service,Java消息服務)API是一個消息服務的標准/規范,允許應用程序組件基於JavaEE平台創建、發送、接收和讀取消息。它使分布式通信耦合度更低,消息服務更加可靠以及非同步性。
在EJB架構中,有消息bean可以無縫的與JM消息服務集成。在J2EE架構模式中,有消息服務者模式,用於實現消息與應用直接的解耦。
4.1消息模型
在JMS標准中,有兩種消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。
4.1.1 P2P模式
P2P模式包含三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。每個消息都被發送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留著消息,直到他們被消費或超時。
P2P的特點
每個消息只有一個消費者(Consumer)(即一旦被消費,消息就不再在消息隊列中)
發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息之後,不管接收者有沒有正在運行,它不會影響到消息被發送到隊列
接收者在成功接收消息之後需向隊列應答成功
如果希望發送的每個消息都會被成功處理的話,那麼需要P2P模式。(架構KKQ:466097527,歡迎加入)
4.1.2 Pub/sub模式
包含三個角色主題(Topic),發布者(Publisher),訂閱者(Subscriber) 。多個發布者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。
Pub/Sub的特點
每個消息可以有多個消費者
發布者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須創建一個訂閱者之後,才能消費發布者的消息。
為了消費消息,訂閱者必須保持運行的狀態。
為了緩和這樣嚴格的時間相關性,JMS允許訂閱者創建一個可持久化的訂閱。這樣,即使訂閱者沒有被激活(運行),它也能接收到發布者的消息。
如果希望發送的消息可以不被做任何處理、或者只被一個消息者處理、或者可以被多個消費者處理的話,那麼可以採用Pub/Sub模型。
4.2消息消費
在JMS中,消息的產生和消費都是非同步的。對於消費來說,JMS的消息者可以通過兩種方式來消費消息。
(1)同步
訂閱者或接收者通過receive方法來接收消息,receive方法在接收到消息之前(或超時之前)將一直阻塞;
(2)非同步
訂閱者或接收者可以注冊為一個消息監聽器。當消息到達之後,系統自動調用監聽器的onMessage方法。
JNDI:Java命名和目錄介面,是一種標準的Java命名系統介面。可以在網路上查找和訪問服務。通過指定一個資源名稱,該名稱對應於資料庫或命名服務中的一個記錄,同時返回資源連接建立所必須的信息。
JNDI在JMS中起到查找和訪問發送目標或消息來源的作用。(架構KKQ:466097527,歡迎加入)
4.3JMS編程模型
(1) ConnectionFactory
創建Connection對象的工廠,針對兩種不同的jms消息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種。可以通過JNDI來查找ConnectionFactory對象。
(2) Destination
Destination的意思是消息生產者的消息發送目標或者說消息消費者的消息來源。對於消息生產者來說,它的Destination是某個隊列(Queue)或某個主題(Topic);對於消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)。
所以,Destination實際上就是兩種類型的對象:Queue、Topic可以通過JNDI來查找Destination。
(3) Connection
Connection表示在客戶端和JMS系統之間建立的鏈接(對TCP/IP socket的包裝)。Connection可以產生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種類型:QueueConnection和TopicConnection。
(4) Session
Session是操作消息的介面。可以通過session創建生產者、消費者、消息等。Session提供了事務的功能。當需要使用session發送/接收多個消息時,可以將這些發送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。
(5) 消息的生產者
消息生產者由Session創建,並用於將消息發送到Destination。同樣,消息生產者分兩種類型:QueueSender和TopicPublisher。可以調用消息生產者的方法(send或publish方法)發送消息。
(6) 消息消費者
消息消費者由Session創建,用於接收被發送到Destination的消息。兩種類型:QueueReceiver和TopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來創建。當然,也可以session的creatDurableSubscriber方法來創建持久化的訂閱者。
(7) MessageListener
消息監聽器。如果注冊了消息監聽器,一旦消息到達,將自動調用監聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener。
深入學習JMS對掌握JAVA架構,EJB架構有很好的幫助,消息中間件也是大型分布式系統必須的組件。本次分享主要做全局性介紹,具體的深入需要大家學習,實踐,總結,領會。
五、常用消息隊列
一般商用的容器,比如WebLogic,JBoss,都支持JMS標准,開發上很方便。但免費的比如Tomcat,Jetty等則需要使用第三方的消息中間件。本部分內容介紹常用的消息中間件(Active MQ,Rabbit MQ,Zero MQ,Kafka)以及他們的特點。
5.1 ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息匯流排。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現,盡管JMS規范出台已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。
ActiveMQ特性如下:
⒈ 多種語言和協議編寫客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
⒉ 完全支持JMS1.1和J2EE 1.4規范 (持久化,XA消息,事務)
⒊ 對spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統裡面去,而且也支持Spring2.0的特性
⒋ 通過了常見J2EE伺服器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業伺服器上
⒌ 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
⒍ 支持通過JDBC和journal提供高速的消息持久化
⒎ 從設計上保證了高性能的集群,客戶端-伺服器,點對點
⒏ 支持Ajax
⒐ 支持與Axis的整合
⒑ 可以很容易得調用內嵌JMS provider,進行測試
5.2 RabbitMQ
RabbitMQ是流行的開源消息隊列系統,用erlang語言開發。RabbitMQ是AMQP(高級消息隊列協議)的標准實現。支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
幾個重要概念:
Broker:簡單來說就是消息隊列伺服器實體。
Exchange:消息交換機,它指定消息按什麼規則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的許可權分離。
procer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。
消息隊列的使用過程,如下:
(1)客戶端連接到消息隊列伺服器,打開一個channel。
(2)客戶端聲明一個exchange,並設置相關屬性。
(3)客戶端聲明一個queue,並設置相關屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關系。
(5)客戶端投遞消息到exchange。
exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列里。
5.3 ZeroMQ
號稱史上最快的消息隊列,它實際類似於Socket的一系列介面,他跟Socket的區別是:普通的socket是端到端的(1:1的關系),而ZMQ卻是可以N:M 的關系,人們對BSD套接字的了解較多的是點對點的連接,點對點連接需要顯式地建立連接、銷毀連接、選擇協議(TCP/UDP)和處理錯誤等,而ZMQ屏蔽了這些細節,讓你的網路編程更為簡單。ZMQ用於node與node間的通信,node可以是主機或者是進程。
引用官方的說法: 「ZMQ(以下ZeroMQ簡稱ZMQ)是一個簡單好用的傳輸層,像框架一樣的一個socket library,他使得Socket編程更加簡單、簡潔和性能更高。是一個消息處理隊列庫,可在多個線程、內核和主機盒之間彈性伸縮。ZMQ的明確目標是「成為標准網路協議棧的一部分,之後進入Linux內核」。現在還未看到它們的成功。但是,它無疑是極具前景的、並且是人們更加需要的「傳統」BSD套接字之上的一 層封裝。ZMQ讓編寫高性能網路應用程序極為簡單和有趣。」
特點是:
高性能,非持久化;
跨平台:支持Linux、Windows、OS X等。
多語言支持; C、C++、Java、.NET、Python等30多種開發語言。
可單獨部署或集成到應用中使用;
可作為Socket通信庫使用。
與RabbitMQ相比,ZMQ並不像是一個傳統意義上的消息隊列伺服器,事實上,它也根本不是一個伺服器,更像一個底層的網路通訊庫,在Socket API之上做了一層封裝,將網路通訊、進程通訊和線程通訊抽象為統一的API介面。支持「Request-Reply 「,」Publisher-Subscriber「,」Parallel Pipeline」三種基本模型和擴展模型。
ZeroMQ高性能設計要點:
1、無鎖的隊列模型
對於跨線程間的交互(用戶端和session)之間的數據交換通道pipe,採用無鎖的隊列演算法CAS;在pipe兩端注冊有非同步事件,在讀或者寫消息到pipe的時,會自動觸發讀寫事件。
2、批量處理的演算法
對於傳統的消息處理,每個消息在發送和接收的時候,都需要系統的調用,這樣對於大量的消息,系統的開銷比較大,zeroMQ對於批量的消息,進行了適應性的優化,可以批量的接收和發送消息。
3、多核下的線程綁定,無須CPU切換
區別於傳統的多線程並發模式,信號量或者臨界區, zeroMQ充分利用多核的優勢,每個核綁定運行一個工作者線程,避免多線程之間的CPU切換開銷。
5.4 Kafka
Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網路上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。 對於像Hadoop的一樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行載入機制來統一線上和離線的消息處理,也是為了通過集群機來提供實時的消費。
Kafka是一種高吞吐量的分布式發布訂閱消息系統,有如下特性:
通過O(1)的磁碟數據結構提供消息的持久化,這種結構對於即使數以TB的消息存儲也能夠保持長時間的穩定性能。(文件追加的方式寫入數據,過期的數據定期刪除)
高吞吐量:即使是非常普通的硬體Kafka也可以支持每秒數百萬的消息。
支持通過Kafka伺服器和消費機集群來分區消息。
支持Hadoop並行數據載入。
Kafka相關概念
Broker
Kafka集群包含一個或多個伺服器,這種伺服器被稱為broker[5]
Topic
每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存於何處)
Partition
Parition是物理上的概念,每個Topic包含一個或多個Partition.
Procer
負責發布消息到Kafka broker
Consumer
消息消費者,向Kafka broker讀取消息的客戶端。
Consumer Group
每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於默認的group)。
一般應用在大數據日誌處理或對實時性(少量延遲),可靠性(少量丟數據)要求稍低的場景使用。

㈤ PHP-RabbitMQ學習日記(一)

給自己做一個記錄

本文主要介紹RabbitMQ的基礎概念、Windows平台上的安裝和啟動、關閉流程以及一些小知識。

RabbitMQ是一個基於AMQP協議的消息隊列軟體,採用Erlang語言編寫。其主要組成部分包括虛擬主機、通道、交換機、隊列、路由關鍵字、消息生產者和消息消費者。

虛擬主機用於實現不同用戶的許可權隔離;通道建立訪問通道;交換機指定消息按特定規則路由到隊列;隊列接收並存儲消息;路由關鍵字用於交換機進行消息投遞;生產者投遞消息;消費者接收消息。

例如,想像你作為生產者去逛街,買了一件粉色的口罩作為禮物給你的女友。由於你有事無法立刻給她,選擇將口罩暫時放在家裡,然後讓你的女友自己來取。使用RabbitMQ連接,你選擇了碧桂園小區(虛擬主機)作為家的位置,從A道路(選擇或建立通道)回到小區,使用門卡(路由)進入小區,最終將禮物放在B棟9樓(隊列)。

之後,你的女友根據提供的信息前往家,取走了口罩,可能還會用微信向你抱怨一番。

在Windows平台上安裝和使用RabbitMQ,首先需要安裝Erlang環境和RabbitMQ伺服器。

安裝步驟包括下載安裝Erlang OTP For Windows(erlang的環境)和RabbitMQ Server Windows Installer(RabbitMQ伺服器安裝程序)。安裝完成後,使用cmd命令檢查是否安裝成功,具體操作為在安裝目錄下執行rabbitmqctl status。

為了方便管理和監控RabbitMQ,可以安裝RabbitMQ Web管理插件,通過執行rabbitmq-plugins enable rabbitmq_management命令來啟用。

如果在安裝過程中遇到問題,可以使用特定命令進行重裝或解決問題。

使用瀏覽器訪問 127.0.0.1:15672/ 可以訪問RabbitMQ Web管理界面,默認賬號密碼為guest。

在使用RabbitMQ時還需注意埠問題,RabbitMQ默認使用5672、15672、25672埠。

關於RabbitMQ的持久化功能,包括Exchange持久化、Queue持久化和消息持久化。如果設置Exchange和Queue持久化,路由信息也會自動持久化。

消息確認機制使得生產者在處理完消息後需向RabbitMQ返回ack信息,確保消息被正確處理。消息重回隊列功能則在出現突發情況時,將未收到ack的消息重新送回隊列,避免消息丟失。

以上內容僅為RabbitMQ的基礎介紹,實際應用中還有更多細節需要探索和驗證。在學習過程中,保持好奇心,積極解決問題,不斷積累經驗,將有助於更深入地理解RabbitMQ。

㈥ php 從rabbitmq consume 和 get的區別

以下是阿里雲查到的解釋

在RabbitMQ中消費者有2種方式獲取隊列中的消息:

a)一種是通過basic.consume命令,訂閱某一個隊列中的消息,channel會自動在處理完上一條消息之後,接收下一條消息。(同一個channel消息處理是串列的)。除非關閉channel或者取消訂閱,否則客戶端將會一直接收隊列的消息。

b)另外一種方式是通過basic.get命令主動獲取隊列中的消息,但是絕對不可以通過循環調用basic.get來代替basic.consume,這是因為basic.get RabbitMQ在實際執行的時候,是首先consume某一個隊列,然後檢索第一條消息,然後再取消訂閱。如果是高吞吐率的消費者,最好還是建議使用basic.consume。

簡單總結一下就是說:

consume是只要隊列裡面還有消息就一直取。

get是只取了隊列裡面的第一條消息。

因為get開銷大,如果需要從一個隊列取消息的話,首選consume方式,慎用循環get方式。

㈦ PHP+Laravel框架RabbitMQ簡單使用(PTP)

一、簡介

Point-to-Point,點對點通信模型。PTP是基於隊列(Queue)的,一個隊列可以有多個生產者,和多個消費者。消息伺服器按照收到消息的先後順序,將消息放到隊列中。隊列中的每一條消息,只能由一個消費者進行消費,消費之後就會從隊列中移除。

特點:

每個消息只用一個消費者;

發送者和接受者沒有時間依賴;

接受者確認消息接受和處理成功。

P 表示為生產者 、C 表示為消費者,紅色表示隊列。

在RabbitMQ中有生產者,消費者的概念,本篇主要是消息如何生產以及消費者這部分的實現。使用的laravel框架,php-amqplib拓展。

二、Laravel中添加依賴

在項目根目錄下執行一下命令

composer require php-amqplib/php-amqplib

lishuo@李碩的MacBook?Pro:~/Code/php/www.zfw.com?(branch:?master!)$?composer?require?php-amqplib/php-amqplibUsing?version?^3.1?for?php-amqplib/php-amqplib./composer.json?has?been?updatedRunning?composer?update?php-amqplib/php-amqplibLoading?composer?repositories?with?package?informationUpdating?dependenciesNothing?to?modify?in?lock?fileInstalling?dependencies?from?lock?file?(including?require-dev)Nothing?to?install,?update?or?removePackage?caouecs/laravel-lang?is?abandoned,?you?should?avoid?using?it.?Use?https://github.com/Laravel-Lang/lang?instead.Package?swiftmailer/swiftmailer?is?abandoned,?you?should?avoid?using?it.?Use?symfony/mailer?instead.Generating?optimized?autoload?files>?::postAutoloadDump>?@php?artisan?package:discover?--ansiDiscovered?Package:?barryvdh/laravel-ide-helperDiscovered?Package:?facade/ignitionDiscovered?Package:?fruitcake/laravel-corsDiscovered?Package:?jenssegers/mongodbDiscovered?Package:?laravel/passportDiscovered?Package:?laravel/sailDiscovered?Package:?laravel/sanctumDiscovered?Package:?laravel/tinkerDiscovered?Package:?maatwebsite/excelDiscovered?Package:?nesbot/carbonDiscovered?Package:?nunomaro/collisionPackage?manifest?generated?successfully.100?packages?you?are?using?are?looking?for?funding.Use?the?`composer?fund`?command?to?find?out?more!>?@php?artisan?vendor:publish?--tag=laravel-assets?--ansiNo?publishable?resources?for?tag?[laravel-assets].Publishing?complete.三、使用Laravel的command來實現消息的生產和消費1.創建生產者

執行以下命令快速創建生產者

php artisan make:command RabbitmqProcerCommand

lishuo@李碩的MacBook?Pro:~/Code/php/www.zfw.com?(branch:?master!)$?php?artisan?make:command?RabbitmqProcerCommandConsole?command?created?successfully.基本代碼(接下來就在command裡面寫生產消息的邏輯)<?phpnamespace?AppConsoleCommands;use?IlluminateConsoleCommand;//引入amqp擴展use?;use?PhpAmqpLibMessageAMQPMessage;class?RabbitmqProcerCommand?extends?Command{????/**?????*?The?name?and?signature?of?the?console?command.?????*?????*?@var?string?????*/????protected?$signature?=?'rabbitmq_procer'//給生產者起個command名稱????/**?????*?The?console?command?description.?????*?????*?@var?string?????*/????protected?$description?=?'Command?description'????/**?????*?Create?a?new?command?instance.?????*?????*?@return?void?????*/????public?function?__construct()????{????????parent::__construct();????}????/**?????*?Execute?the?console?command.?????*??生產者消息代碼?????*?@return?int?????*/????public?function?handle()????{????????//創建伺服器連接????????$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');????????//連接信道????????//信道是生產消費者與rabbit通信的渠道,生產者publish或者消費者消費一個隊列都是需要通過信道來通信的????????//信道是建立在TCP上面的虛擬鏈接,也就是rabbitMQ在一個TCP上面建立成百上千的信道來達到多個線程處理。????????//注意是一個TCP?被多個線程共享,每個線程對應一個信道,信道在rabbit都有唯一的ID,保證了信道的私有性,對應上唯一的線程使用。????????$channel?=?$connection->channel();????????//channel->queue_declare通過信道創建一個是否是持久化的消息隊列????????//queue第一個參數代表消息隊列名稱????????$channel->queue_declare('test',?false,?false,?false,?false);????????//往隊列里要發送內容,待發送的內容????????$msg?=?new?AMQPMessage('我是一個生產者消息');????????//通過信道來進行發送消息????????//而exchange是怎麼知道消息應該推到哪個queue呢,這就要通過綁定queue與exchange時的routingkey了,通過代碼進行綁定並且指定routingkey,下面有一張關系圖,p(發布者)?—>?x(exchange)?bindding(綁定關系也就是我們的routingkey)?紅色代表著queue????????$channel->basic_publish($msg,?'',?'test');????????echo?"?[x]?Sent?'我是一個生產者消息!' ";????????//關閉信道????????$channel->close();????????//關閉連接????????$connection->close();????}}2.創建消費者

因為消費者是需要常駐內存的,所以需要在cli下運行,我們可以通過以下操作創建一個任務。

?php?artisan?make:command?RabbitmqConsumerCommand基本代碼(接下來就在command裡面寫消費消息的邏輯)<?phpnamespace?AppConsoleCommands;use?IlluminateConsoleCommand;use?;class?RabbitmqConsumerCommand?extends?Command{????/**?????*?The?name?and?signature?of?the?console?command.?????*?????*?@var?string?????*/????protected?$signature?=?'rabbitmq_consumer'//給消費者起個command名稱????/**?????*?The?console?command?description.?????*?????*?@var?string?????*/????protected?$description?=?'Command?description'????/**?????*?Create?a?new?command?instance.?????*?????*?@return?void?????*/????public?function?__construct()????{????????parent::__construct();????}????/**?????*?Execute?the?console?command.?????*?????*?@return?int?????*/????public?function?handle()????{????????//創建伺服器連接????????$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');????????//連接信道????????//信道是生產消費者與rabbit通信的渠道,生產者publish或者消費者消費一個隊列都是需要通過信道來通信的????????//信道是建立在TCP上面的虛擬鏈接,也就是rabbitMQ在一個TCP上面建立成百上千的信道來達到多個線程處理。????????//注意是一個TCP?被多個線程共享,每個線程對應一個信道,信道在rabbit都有唯一的ID,保證了信道的私有性,對應上唯一的線程使用。????????$channel?=?$connection->channel();????????//channel->queue_declare通過信道創建一個是否是持久化的消息隊列????????//queue第一個參數代表消息隊列名稱????????$channel->queue_declare('test',?false,?false,?false,?false);????????echo?"?[*]?Waiting?for?messages.?To?exit?press?CTRL+C ";????????//進行監聽消費者是否有消息,如果有進行輸出消息內容????????$callback?=?function?($msg)?{????????????echo?'?[x]?Received?',?$msg->body,?" ";????????};????????//通過信道進行消費消息????????$channel->basic_consume('test',?'',?false,?true,?false,?false,?$callback);????????//如果信道是打開狀態????????while?($channel->is_open())?{????????????//然後讓信道一直處於監聽等待狀態????????????$channel->wait();????????}????????//關閉信道????????$channel->close();????????//關閉連接????????$connection->close();????}}三、使用command進行測試生產消息和消費消息是否成功

執行生產消息 php artisan rabbitmq_procer 執行消費消息 hp artisan rabbitmq_consumer

閱讀全文

與rabbitmqphp示例相關的資料

熱點內容
windows源碼上傳 瀏覽:651
在APP中繼續是什麼意思 瀏覽:138
程序員哪裡寫博客最好 瀏覽:32
android自動部署 瀏覽:507
加密文件的密鑰丟失 瀏覽:403
android百度天氣api 瀏覽:636
伺服器桌面上的雲上pdf 瀏覽:966
加密的excel文件可以列印嗎 瀏覽:24
javadate類型的格式 瀏覽:248
應用加密指紋登錄 瀏覽:50
頁面不規范的app有什麼 瀏覽:798
有機波譜分析第三版答案pdf 瀏覽:318
每個網站都有什麼伺服器 瀏覽:437
桃子app怎麼下載 瀏覽:776
豎式計算的手指演算法 瀏覽:507
黑馬程序員前端 瀏覽:300
swich硬破伺服器沒有了怎麼辦 瀏覽:933
對於分類演算法的表述不正確的是 瀏覽:568
電腦上下了種子怎麼解壓 瀏覽:834
海龍工具破解版壓縮包解壓密碼 瀏覽:834