导航:首页 > 编程语言 > python消息中间件

python消息中间件

发布时间:2022-12-20 22:18:51

❶ 消息中间件之RabbitMQ

JMS:java Message Service,java消息服务,是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
JMS是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。

p2p:点对点发送,一个消息只能被消费一次
涉及:
消息队列(Queue)
发送者(Sender)
接收者(Receiver)
每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着信息,直到它们被消费或超时。
示意图:p2p示意图
特点:

Pub/Sub:发布订阅,一个消息可以被消费多次
涉及角色:
主题(Topic)
发布者(Publisher)
订阅者(Subscriber)
客户端将消息发送到主题。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
示意图:Pub/Sub示意图
特点:

MQ:消息中间件(MOM:Message Orient middleware),消息队列
作为系统间通信的必备技术,低耦合、可靠传输、流量控制、最终一致性
实现异步消息通信

Apache下
完全支持Java的JMS协议
消息模式:1、点对点 2、发布订阅

Erlang语言实现的开源的MQ中间件,支持多种协议
主要的通信协议是AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用协议的一个开放标准,为面向消息的中间件设计。

Apache下开源项目
高性能分布式消息队列,一般海量数据传输,大数据部门用
单机吞吐量:10w/s

阿里 贡献给了Apache
参考了Kafka实现基于Java 消息中间件

消息传输最快

RabbitMQ是一个开源的AMQP实现,服务端用Erlang语言编写,支持多种客户端,如:python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

涉及角色:

可以基于Docker安装RabbitMQ,记住其端口:
15672:网页版可视化服务器数据
5672:客户端连接的端口号

点对点消息
一个消息只能消费一次
只需要队列就可以,不需要交换机
消息发送者和消息接收者者可以不同时在线

RabbitMQ特色就在于Exchange,主要有以下类型:
fanout:只要有消息就转发给绑定的队列,不会进行消息的路由判断
direct:会根据路由匹配规则,将消息发送到指定队列中,注意路由规则不支持特殊字符
topic:会根据路由匹配规则,将消息发送到指定队列中,注意路由规则支持特殊字符,比如:* #

❷ python用的第三方库属于中间件吗

是的。
Django中间件是用来处理Django的请求request和响应response的框架级别的钩子,它是一个轻量,低级别的插件系统,用于全局范围内改变Django的输入,输出,每个中间件组件都负责做一些特定的功能。

❸ 从 0 到 1:全面理解 RPC 远程调用

作者 | Python编程时光

责编 | 胡巍巍

什么是RPC呢?网络给出的解释是这样的:“RPC(Remote Procere Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议”。

这个概念听起来还是比较抽象,没关系,继续往后看,后面概念性的东西,我会讲得足够清楚,让你完全掌握 RPC 的基础内容。

在 OpenStack 里的进程间通信方式主要有两种,一种是基于HTTP协议的RESTFul API方式,另一种则是RPC调用。

那么这两种方式在应用场景上有何区别呢?

有使用经验的人,就会知道:

首先,给你提两个问题,带着这两个问题再往下看:

1、RPC 和 REST 区别是什么?2、为什么要采用RPC呢?

首先,第一个问题:RPC 和 REST 区别是什么?

你一定会觉得这个问题很奇怪,是的,包括我,但是你在网络上一搜,会发现类似对比的文章比比皆是,我在想可能很多初学者由于基础不牢固,才会将不相干的二者拿出来对比吧。既然是这样,那为了让你更加了解陌生的RPC,就从你熟悉得不能再熟悉的 REST 入手吧。

01、所属类别不同

REST,是Representational State Transfer 的简写,中文描述表述性状态传递(是指某个瞬间状态的资源数据的快照,包括资源数据的内容、表述格式(XML、JSON)等信息。)

REST 是一种软件架构风格。这种风格的典型应用,就是HTTP。其因为简单、扩展性强的特点而广受开发者的青睐。

而RPC 呢,是 Remote Procere Call Protocol 的简写,中文描述是远程过程调用,它可以实现客户端像调用本地服务(方法)一样调用服务器的服务(方法)。

而 RPC 可以基于 TCP/UDP,也可以基于 HTTP 协议进行传输的,按理说它和REST不是一个层面意义上的东西,不应该放在一起讨论,但是谁让REST这么流行呢,它是目前最流行的一套互联网应用程序的API设计标准,某种意义下,我们说 REST 可以其实就是指代 HTTP 协议。

02、使用方式不同

03、面向对象不同

从设计上来看,RPC,所谓的远程过程调用 ,是面向方法的 ,REST:所谓的 Representational state transfer ,是面向资源的,除此之外,还有一种叫做 SOA,所谓的面向服务的架构,它是面向消息的,这个接触不多,就不多说了。

04、序列化协议不同

接口调用通常包含两个部分,序列化和通信协议。

通信协议,上面已经提及了,REST 是 基于 HTTP 协议,而 RPC 可以基于 TCP/UDP,也可以基于 HTTP 协议进行传输的。

常见的序列化协议,有:json、xml、hession、protobuf、thrift、text、bytes等,REST 通常使用的是 JSON或者XML,而 RPC 使用的是 JSON-RPC,或者 XML-RPC。

通过以上几点,我们知道了 REST 和 RPC 之间有很明显的差异。

然后第二个问题:为什么要采用RPC呢?

那到底为何要使用 RPC,单纯的依靠RESTful API不可以吗?为什么要搞这么多复杂的协议,渣渣表示真的学不过来了。

关于这一点,以下几点仅是我的个人猜想,仅供交流哈:

说了这么多,我们该如何选择这两者呢?我总结了如下两点,供你参考:

“远程调用”意思就是:被调用方法的具体实现不在程序运行本地,而是在别的某个地方(分布到各个服务器),调用者只想要函数运算的结果,却不需要实现函数的具体细节。

光说不练嘴把式,接下来,我将分别用三种不同的方式全面地让你搞明白 rpc 远程调用是如何实现的。

01、基于 xml-rpc

Python实现 rpc,可以使用标准库里的 SimpleXMLRPCServer,它是基于XML-RPC 协议的。

有了这个模块,开启一个 rpc server,就变得相当简单了。执行以下代码:

有了 rpc server,接下来就是 rpc client,由于我们上面使用的是 XML-RPC,所以 rpc clinet 需要使用xmlrpclib 这个库。

然后,我们通过 server_proxy 对象就可以远程调用之前的rpc server的函数了。

SimpleXMLRPCServer是一个单线程的服务器。这意味着,如果几个客户端同时发出多个请求,其它的请求就必须等待第一个请求完成以后才能继续。

若非要使用 SimpleXMLRPCServer 实现多线程并发,其实也不难。只要将代码改成如下即可。

02、基于json-rpc

SimpleXMLRPCServer 是基于 xml-rpc 实现的远程调用,上面我们也提到 除了 xml-rpc 之外,还有 json-rpc 协议。

那 python 如何实现基于 json-rpc 协议呢?

答案是很多,很多web框架其自身都自己实现了json-rpc,但我们要独立这些框架之外,要寻求一种较为干净的解决方案,我查找到的选择有两种

第一种是 jsonrpclib

第二种是 python-jsonrpc

先来看第一种 jsonrpclib

它与 Python 标准库的 SimpleXMLRPCServer 很类似(因为它的类名就叫做 SimpleJSONRPCServer ,不明真相的人真以为它们是亲兄弟)。或许可以说,jsonrpclib 就是仿照 SimpleXMLRPCServer 标准库来进行编写的。

它的导入与 SimpleXMLRPCServer 略有不同,因为SimpleJSONRPCServer分布在jsonrpclib库中。

服务端

客户端

再来看第二种python-jsonrpc,写起来貌似有些复杂。

服务端

客户端

调用过程如下

还记得上面我提到过的 zabbix API,因为我有接触过,所以也拎出来讲讲。zabbix API 也是基于 json-rpc 2.0协议实现的。

因为内容较多,这里只带大家打个,zabbix 是如何调用的:直接指明要调用 zabbix server 的哪个方法,要传给这个方法的参数有哪些。

03、基于 zerorpc

以上介绍的两种rpc远程调用方式,如果你足够细心,可以发现他们都是http+rpc 两种协议结合实现的。

接下来,我们要介绍的这种(zerorpc),就不再使用走 http 了。

zerorpc 这个第三方库,它是基于TCP协议、 ZeroMQ 和 MessagePack的,速度相对快,响应时间短,并发高。zerorpc 和 pyjsonrpc 一样,需要额外安装,虽然SimpleXMLRPCServer不需要额外安装,但是SimpleXMLRPCServer性能相对差一些。

调用过程如下

客户端除了可以使用zerorpc框架实现代码调用之外,它还支持使用“命令行”的方式调用。

客户端可以使用命令行,那服务端是不是也可以呢?

是的,通过 Github 上的文档几个 demo 可以体验到这个第三方库做真的是优秀。

比如我们可以用下面这个命令,创建一个rpc server,后面这个 time Python 标准库中的 time 模块,zerorpc 会将 time 注册绑定以供client调用。

经过了上面的学习,我们已经学会了如何使用多种方式实现rpc远程调用。

通过对比,zerorpc 可以说是脱颖而出,一支独秀。

为此,我也做了一番思考:

OpenStack 组件繁多,在一个较大的集群内部每个组件内部通过rpc通信频繁,如果都采用rpc直连调用的方式,连接数会非常地多,开销大,若有些 server 是单线程的模式,超时会非常的严重。

OpenStack 是复杂的分布式集群架构,会有多个 rpc server 同时工作,假设有 server01,server02,server03 三个server,当 rpc client 要发出rpc请求时,发给哪个好呢?这是问题一。

你可能会说轮循或者随机,这样对大家都公平。这样的话还会引出另一个问题,倘若请求刚好发到server01,而server01刚好不凑巧,可能由于机器或者其他因为导致服务没在工作,那这个rpc消息可就直接失败了呀。要知道做为一个集群,高可用是基本要求,如果出现刚刚那样的情况其实是很尴尬的。这是问题二。

集群有可能根据实际需要扩充节点数量,如果使用直接调用,耦合度太高,不利于部署和生产。这是问题三。

引入消息中间件,可以很好的解决这些问题。

解决问题一:消息只有一份,接收者由AMQP的负载算法决定,默认为在所有Receiver中均匀发送(round robin)。

解决问题二:有了消息中间件做缓冲站,client 可以任性随意的发,server 都挂掉了?没有关系,等 server 正常工作后,自己来消息中间件取就行了。

解决问题三:无论有多少节点,它们只要认识消息中间件这一个中介就足够了。

既然讲到了消息队列,如果你之前没有接触过这块内容,最好花几分钟的时间跟我好好过下关于消息队列的几个基础概念。

首先,RPC只是定义了一个通信接口,其底层的实现可以各不相同,可以是 socket,也可以是今天要讲的 AMQP。

AMQP(Advanced Message Queuing Protocol)是一种基于队列的可靠消息服务协议,作为一种通信协议,AMQP同样存在多个实现,如Apache Qpid,RabbitMQ等。

以下是 AMQP 中的几个必知的概念:

Publisher:消息发布者

Queue:用来保存消息的存储空间,消息没有被receiver前,保存在队列中。

Exchange:用来接收Publisher发出的消息,根据Routing key 转发消息到对应的Message Queue中,至于转到哪个队列里,这个路由算法又由exchange type决定的。

Exchange type:主要四种描述exchange的类型。

direct:消息路由到满足此条件的队列中(queue,可以有多个):routing key = binding key

topic:消息路由到满足此条件的队列中(queue,可以有多个):routing key 匹配 binding pattern. binding pattern是类似正则表达式的字符串,可以满足复杂的路由条件。

fanout:消息路由到多有绑定到该exchange的队列中。

binding :binding是用来描述exchange和queue之间的关系的概念,一个exchang可以绑定多个队列,这些关系由binding建立。前面说的binding key /binding pattern也是在binding中给出。

为了让你明白这几者的关系,我画了一张模型图。

关于AMQP,有几下几点值得注意:

前面铺垫了那么久,终于到了讲真实应用的场景。在生产中RPC是如何应用的呢?

其他模型我不太清楚,在 OpenStack 中的应用模型是这样的

至于为什么要如此设计,前面我已经给出了自己的观点。

接下来,就是源码解读 OpenStack ,看看其是如何通过rpc进行远程调用的。如若你对此没有兴趣(我知道很多人对此都没有兴趣,所以不浪费大家时间),可以直接跳过这一节,进入下一节。

目前Openstack中有两种RPC实现,一种是在oslo messaging,一种是在openstack.common.rpc。

openstack.common.rpc是旧的实现,oslo messaging是对openstack.common.rpc的重构。openstack.common.rpc在每个项目中都存在一份拷贝,oslo messaging即将这些公共代码抽取出来,形成一个新的项目。oslo messaging也对RPC API 进行了重新设计,对多种 transport 做了进一步封装,底层也是用到了kombu这个AMQP库。(注:Kombu 是Python中的messaging库。Kombu旨在通过为AMQ协议提供惯用的高级接口,使Python中的消息传递尽可能简单,并为常见的消息传递问题提供经过验证和测试的解决方案。)

关于oslo_messaging库,主要提供了两种独立的API:

因为 notify 实现是太简单了,所以这里我就不多说了,如果有人想要看这方面内容,可以收藏我的博客(http://python-online.cn) ,我会更新补充 notify 的内容。

OpenStack RPC 模块提供了 rpc.call,rpc.cast, rpc.fanout_cast 三种 RPC 调用方法,发送和接收 RPC 请求。

rpc.call 和 .rpc.cast 从实现代码上看,他们的区别很小,就是call调用时候会带有wait_for_reply=True参数,而cast不带。

要了解 rpc 的调用机制呢,首先要知道 oslo_messaging 的几个概念主要方法有四个:

transport:RPC功能的底层实现方法,这里是rabbitmq的消息队列的访问路径

transport 就是定义你如何访连接消息中间件,比如你使用的是 Rabbitmq,那在 nova.conf中应该有一行transport_url的配置,可以很清楚地看出指定了 rabbitmq 为消息中间件,并配置了连接rabbitmq的user,passwd,主机,端口。

target用来表述 RPC 服务器监听topic,server名称和server监听的exchange,是否广播fanout。

rpc server 要获取消息,需要定义target,就像一个门牌号一样。

rpc client 要发送消息,也需要有target,说明消息要发到哪去。

endpoints:是可供别人远程调用的对象

RPC服务器暴露出endpoint,每个 endpoint 包涵一系列的可被远程客户端通过 transport 调用的方法。直观理解,可以参考nova-conctor创建rpc server的代码,这边的endpoints就是 nova/manager.py:ConctorManager

dispatcher:分发器,这是 rpc server 才有的概念

只有通过它 server 端才知道接收到的rpc请求,要交给谁处理,怎么处理?

在client端,是这样指定要调用哪个方法的。

而在server端,是如何知道要执行这个方法的呢?这就是dispatcher 要干的事,它从 endpoint 里找到这个方法,然后执行,最后返回。

Serializer:在 python 对象和message(notification) 之间数据做序列化或是反序列化的基类。

主要方法有四个:

每个notification listener都和一个executor绑定,来控制收到的notification如何分配。默认情况下,使用的是blocking executor(具体特性参加executor一节)

模仿是一种很高效的学习方法,我这里根据 OpenStack 的调用方式,抽取出核心内容,写成一个简单的 demo,有对 OpenStack 感兴趣的可以了解一下,大部分人也可以直接跳过这章节。

注意以下代码不能直接运行,你还需要配置 rabbitmq 的连接方式,你可以写在配置文件中,通过 get_transport 从cfg.CONF 中读取,也可以直接将其写成url的格式做成参数,传给 get_transport 。而且还要nova或者其他openstack组件的环境中运行(因为需要有ctxt这个环境变量)

简单的 rpc client

简单的 rpc server

【End】

热 文 推 荐

☞Facebook 发币 Libra;谷歌十亿美金为穷人造房;第四代树莓派 Raspberry Pi 4 发布 | 开发者周刊

☞WebRTC 将一统实时音视频天下?

☞小米崔宝秋:小米 AIoT 深度拥抱开源

☞华为在美研发机构 Futurewei 意欲分家?

☞老司机教你如何写出没人敢维护的代码!

☞Python有哪些技术上的优点?比其他语言好在哪儿?

☞上不了北大“图灵”、清华“姚班”,AI专业还能去哪上?

☞公链史记 | 从鸿蒙初辟到万物生长的十年激荡……

☞边缘计算容器化是否有必要?

☞马云曾经偶像,终于把阿里留下的1400亿败光了!

你点的每个“在看”,我都认真当成了喜欢

❹ 消息中间件(一)MQ详解及四大MQ比较

一、消息中间件相关知识

1、概述

消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。

2、消息中间件的组成

2.1 Broker

消息服务器,作为server提供消息核心服务

2.2 Procer

消息生产者,业务的发起方,负责生产消息传输给broker,

2.3 Consumer

消息消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理

2.4 Topic

2.5 Queue

2.6 Message

消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输

3 消息中间件模式分类

3.1 点对点

PTP点对点:使用queue作为通信载体

说明:

消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。

消息被消费以后,queue中不再存储,所以消息消费者不可能消费到已经被消费的消息。 Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

说明:

queue实现了负载均衡,将procer生产的消息发送到消息队列中,由多个消费者消费。但一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有一个可用的消费者。

4 消息中间件的优势

4.1 系统解耦

交互系统之间没有直接的调用关系,只是通过消息传输,故系统侵入性不强,耦合度低。

4.2 提高系统响应时间

例如原来的一套逻辑,完成支付可能涉及先修改订单状态、计算会员积分、通知物流配送几个逻辑才能完成;通过MQ架构设计,就可将紧急重要(需要立刻响应)的业务放到该调用方法中,响应要求不高的使用消息队列,放到MQ队列中,供消费者处理。

4.3 为大数据处理架构提供服务

通过消息作为整合,大数据的背景下,消息队列还与实时处理架构整合,为数据处理提供性能支持。

4.4 Java消息服务——JMS

Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

5 消息中间件应用场景

5.1 异步通信

有些业务不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

5.2 解耦

降低工程间的强依赖程度,针对异构系统进行适配。在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。通过消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,当应用发生变化时,可以独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

5.3 冗余

有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

5.4 扩展性

因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。便于分布式扩容。

5.5 过载保护

在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量无法提取预知;如果以为了能处理这类瞬间峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

5.6 可恢复性

系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

5.7 顺序保证

在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。

5.8 缓冲

在任何重要的系统中,都会有需要不同的处理时间的元素。消息队列通过一个缓冲层来帮助任务最高效率的执行,该缓冲有助于控制和优化数据流经过系统的速度。以调节系统响应时间。

5.9 数据流处理

分布式系统产生的海量数据流,如:业务日志、监控数据、用户行为等,针对这些数据流进行实时或批量采集汇总,然后进行大数据分析是当前互联网的必备技术,通过消息队列完成此类数据收集是最好的选择。

6 消息中间件常用协议

6.1 AMQP协议

AMQP即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。

优点:可靠、通用

6.2 MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。

优点:格式简洁、占用带宽小、移动端通信、PUSH、嵌入式系统

6.3 STOMP协议

STOMP(Streaming Text Orientated Message Protocol)是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。STOMP提供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。

优点:命令模式(非topicqueue模式)

6.4 XMPP协议

XMPP(可扩展消息处理现场协议,Extensible Messaging and Presence Protocol)是基于可扩展标记语言(XML)的协议,多用于即时消息(IM)以及在线现场探测。适用于服务器之间的准即时操作。核心是基于XML流传输,这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息,即使其操作系统和浏览器不同。

优点:通用公开、兼容性强、可扩展、安全性高,但XML编码格式占用带宽大

6.5 其他基于TCP/IP自定义的协议

有些特殊框架(如:redis、kafka、zeroMq等)根据自身需要未严格遵循MQ规范,而是基于TCPIP自行封装了一套协议,通过网络socket接口进行传输,实现了MQ的功能。

7 常见消息中间件MQ介绍

7.1 RocketMQ

阿里系下开源的一款分布式、队列模型的消息中间件,原名Metaq,3.0版本名称改为RocketMQ,是阿里参照kafka设计思想使用java实现的一套mq。同时将阿里系内部多款mq产品(Notify、metaq)进行整合,只维护核心功能,去除了所有其他运行时依赖,保证核心功能最简化,在此基础上配合阿里上述其他开源产品实现不同场景下mq的架构,目前主要多用于订单交易系统。

具有以下特点:

官方提供了一些不同于kafka的对比差异:

https://rocketmq.apache.org/docs/motivation/

7.2 RabbitMQ

使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP,STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了Broker架构,核心思想是生产者不会将消息直接发送给队列,消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load balance)、数据持久化都有很好的支持。多用于进行企业级的ESB整合。

7.3 ActiveMQ

Apache下的一个子项目。使用Java完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,少量代码就可以高效地实现高级应用场景。可插拔的传输协议支持,比如:in-VM, TCP, SSL, NIO, UDP, multicast, JGroups and JXTA transports。RabbitMQ、ZeroMQ、ActiveMQ均支持常用的多种语言客户端 C++、Java、.Net,、Python、 Php、 Ruby等。

7.4 Redis

使用C语言开发的一个Key-Value的NoSQL数据库,开发维护很活跃,虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。

7.5 Kafka

Apache下的一个子项目,使用scala实现的一个高性能分布式Publish/Subscribe消息队列系统,具有以下特性:

7.6 ZeroMQ

号称最快的消息队列系统,专门为高吞吐量/低延迟的场景开发,在金融界的应用中经常使用,偏重于实时数据通信场景。ZMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,开发成本高。因此ZeroMQ具有一个独特的非中间件的模式,更像一个socket library,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序本身就是使用ZeroMQ API完成逻辑服务的角色。但是ZeroMQ仅提供非持久性的队列,如果down机,数据将会丢失。如:Twitter的Storm中使用ZeroMQ作为数据流的传输。

ZeroMQ套接字是与传输层无关的:ZeroMQ套接字对所有传输层协议定义了统一的API接口。默认支持 进程内(inproc) ,进程间(IPC) ,多播,TCP协议,在不同的协议之间切换只要简单的改变连接字符串的前缀。可以在任何时候以最小的代价从进程间的本地通信切换到分布式下的TCP通信。ZeroMQ在背后处理连接建立,断开和重连逻辑。

特性:

二、主要消息中间件的比较

❺ 关于activemq的python客户端

指点就做不到了。不过5年前用过,挺稳定的,速度也好。 它有现成的API,你只需要将样例拿过来改一改就可以用了。

中间件这概念似乎是很古老的东西了,你能用中间件,说明你基础很好,这个应该不是问题。

❻ python后端开发需要学什么

第一阶段:Python语言基础


主要学习Python最基础知识,如Python3、数据类型、字符串、函数、类、文件操作等。阶段课程结束后,学员需要完成Pygame实战飞机大战、2048等项目。


第二阶段:Python语言高级


主要学习Python库、正则表达式、进程线程、爬虫、遍历以及MySQL数据库。


第三阶段:Pythonweb开发


主要学习HTML、CSS、JavaScript、jQuery等前端知识,掌握python三大后端框架(Django、 Flask以及Tornado)。需要完成网页界面设计实战;能独立开发网站。


第四阶段:linux基础


主要学习Linux相关的各种命令,如文件处理命令、压缩解压命令、权限管理以及Linux Shell开发等。


第五阶段:Linux运维自动化开发


主要学习Python开发Linux运维、Linux运维报警工具开发、Linux运维报警安全审计开发、Linux业务质量报表工具开发、Kali安全检测工具检测以及Kali 密码破解实战。


第六阶段:Python爬虫


主要学习python爬虫技术,掌握多线程爬虫技术,分布式爬虫技术。


第七阶段:Python数据分析和大数据


主要学习numpy数据处理、pandas数据分析、matplotlib数据可视化、scipy数据统计分析以及python 金融数据分析;Hadoop HDFS、python Hadoop MapRece、python Spark core、python Spark SQL以及python Spark MLlib。


第八阶段:Python机器学习


主要学习KNN算法、线性回归、逻辑斯蒂回归算法、决策树算法、朴素贝叶斯算法、支持向量机以及聚类k-means算法。


关于python后端开发需要学什么的内容,青藤小编就和您分享到这里了。如果您对python编程有浓厚的兴趣,希望这篇文章可以为您提供帮助。如果您还想了解更多关于python编程的技巧及素材等内容,可以点击本站的其他文章进行学习。

❼ 用Python操作nanomsg(一)——准备

日前因工作需要,整在一点一点熟悉开源跨平台消息中间件: nanomsg ,恰逢最近安装了 Typora 用于练习Markdown语法,那就一并把学习总结整理记录下来并同步更新到方便他人和自己日后回看。

nnpy是其中一个对nanomsg的python wrapper,相比于nanomsg-python日渐缺少维护,更推荐使用nnpy。另外,现在也有了nng(nanomsg next negeration),当nanomsg使用熟练后可考虑转nng。

本文基于Pyhton3.7,当前nnpy的最新版本为 1.4.2 ,依次安装cmake、nanomsg、cffi和nnpy:

这里使用的开发环境为 Jetbrains Pycharm 2019 + WSL ,WSL使用的是Kali-Linux,其他版本如Ubuntu、Debian等也都可以。

我本机装的是Python 3.6,点击右下角当前正在使用的本地解析器名称 Python 3.6 ,选择 Add Interpreter

从左侧选择 WSL 后,右侧面板自动出来当前的WSL发行版本,注意的是这里默认的解析器路径为/usr/bin/python,Kali-Linux默认安装的时候只有python3没有python,需要手动改为 /usr/bin/python3

而后点击 OK 完成WSL Interpreter的添加,在右下角选择 3.7@Kali Linux 即可启用WSL作为远程开发环境——不需要SSH、虚拟机或VPS就能在Windows下进行Linux开发,简直不要太舒服!!

nanomsg提供了如下几种通信模式,太具体的不介绍,说完会用就明白是怎么回事儿了:

PipeLine

PushPub

Pair

ReqRep

Survey

Bus :

关于各通信模式的验证请前往本系列后续文章:

❽ 如何用 Python 构建一个简单的分布式系统

分布式爬虫概览
何谓分布式爬虫?
通俗的讲,分布式爬虫就是多台机器多个
spider
对多个
url
的同时处理问题,分布式的方式可以极大提高程序的抓取效率。
构建分布式爬虫通畅需要考虑的问题
(1)如何能保证多台机器同时抓取同一个URL?
(2)如果某个节点挂掉,会不会影响其它节点,任务如何继续?
(3)既然是分布式,如何保证架构的可伸缩性和可扩展性?不同优先级的抓取任务如何进行资源分配和调度?
基于上述问题,我选择使用celery作为分布式任务调度工具,是分布式爬虫中任务和资源调度的核心模块。它会把所有任务都通过消息队列发送给各个分布式节点进行执行,所以可以很好的保证url不会被重复抓取;它在检测到worker挂掉的情况下,会尝试向其他的worker重新发送这个任务信息,这样第二个问题也可以得到解决;celery自带任务路由,我们可以根据实际情况在不同的节点上运行不同的抓取任务(在实战篇我会讲到)。本文主要就是带大家了解一下celery的方方面面(有celery相关经验的同学和大牛可以直接跳过了)
Celery知识储备
celery基础讲解
按celery官网的介绍来说
Celery
是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。它是一个专注于实时处理的任务队列,同时也支持任务调度。
下面几个关于celery的核心知识点
broker:翻译过来叫做中间人。它是一个消息传输的中间件,可以理解为一个邮箱。每当应用程序调用celery的异步任务的时候,会向broker传递消息,而后celery的worker将会取到消息,执行相应程序。这其实就是消费者和生产者之间的桥梁。
backend:
通常程序发送的消息,发完就完了,可能都不知道对方时候接受了。为此,celery实现了一个backend,用于存储这些消息以及celery执行的一些消息和结果。
worker:
Celery类的实例,作用就是执行各种任务。注意在celery3.1.25后windows是不支持celery
worker的!
procer:
发送任务,将其传递给broker
beat:
celery实现的定时任务。可以将其理解为一个procer,因为它也是通过网络调用定时将任务发送给worker执行。注意在windows上celery是不支持定时任务的!
下面是关于celery的架构示意图,结合上面文字的话应该会更好理解
由于celery只是任务队列,而不是真正意义上的消息队列,它自身不具有存储数据的功能,所以broker和backend需要通过第三方工具来存储信息,celery官方推荐的是
RabbitMQ和Redis,另外mongodb等也可以作为broker或者backend,可能不会很稳定,我们这里选择Redis作为broker兼backend。
实际例子
先安装celery
pip
install
celery
我们以官网给出的例子来做说明,并对其进行扩展。首先在项目根目录下,这里我新建一个项目叫做celerystudy,然后切换到该项目目录下,新建文件tasks.py,然后在其中输入下面代码
这里我详细讲一下代码:我们先通过app=Celery()来实例化一个celery对象,在这个过程中,我们指定了它的broker,是redis的db
2,也指定了它的backend,是redis的db3,
broker和backend的连接形式大概是这样
redis://:password@hostname:port/db_number
然后定义了一个add函数,重点是@app.task,它的作用在我看来就是将add()
注册为一个类似服务的东西,本来只能通过本地调用的函数被它装饰后,就可以通过网络来调用。这个tasks.py中的app就是一个worker。它可以有很多任务,比如这里的任务函数add。我们再通过在命令行切换到项目根目录,执行
celery
-A
tasks
worker
-l
info
启动成功后就是下图所示的样子
这里我说一下各个参数的意思,-A指定的是app(即Celery实例)所在的文件模块,我们的app是放在tasks.py中,所以这里是
tasks;worker表示当前以worker的方式运行,难道还有别的方式?对的,比如运行定时任务就不用指定worker这个关键字;
-l
info表示该worker节点的日志等级是info,更多关于启动worker的参数(比如-c、-Q等常用的)请使用
celery
worker
--help
进行查看
将worker启动起来后,我们就可以通过网络来调用add函数了。我们在后面的分布式爬虫构建中也是采用这种方式分发和消费url的。在命令行先切换到项目根目录,然后打开python交互端
from
tasks
import
addrs
=
add.delay(2,
2)
这里的add.delay就是通过网络调用将任务发送给add所在的worker执行,这个时候我们可以在worker的界面看到接收的任务和计算的结果。
这里是异步调用,如果我们需要返回的结果,那么要等rs的ready状态true才行。这里add看不出效果,不过试想一下,如果我们是调用的比较占时间的io任务,那么异步任务就比较有价值了
上面讲的是从Python交互终端中调用add函数,如果我们要从另外一个py文件调用呢?除了通过import然后add.delay()这种方式,我们还可以通过send_task()这种方式,我们在项目根目录另外新建一个py文件叫做
excute_tasks.py,在其中写下如下的代码
from
tasks
import
addif
__name__
==
'__main__':
add.delay(5,
10)
这时候可以在celery的worker界面看到执行的结果
此外,我们还可以通过send_task()来调用,将excute_tasks.py改成这样
这种方式也是可以的。send_task()还可能接收到为注册(即通过@app.task装饰)的任务,这个时候worker会忽略这个消息
定时任务
上面部分讲了怎么启动worker和调用worker的相关函数,这里再讲一下celery的定时任务。
爬虫由于其特殊性,可能需要定时做增量抓取,也可能需要定时做模拟登陆,以防止cookie过期,而celery恰恰就实现了定时任务的功能。在上述基础上,我们将tasks.py文件改成如下内容
然后先通过ctrl+c停掉前一个worker,因为我们代码改了,需要重启worker才会生效。我们再次以celery
-A
tasks
worker
-l
info这个命令开启worker。
这个时候我们只是开启了worker,如果要让worker执行任务,那么还需要通过beat给它定时发送,我们再开一个命令行,切换到项目根目录,通过
这样就表示定时任务已经开始运行了。
眼尖的同学可能看到我这里celery的版本是3.1.25,这是因为celery支持的windows最高版本是3.1.25。由于我的分布式微博爬虫的worker也同时部署在了windows上,所以我选择了使用
3.1.25。如果全是linux系统,建议使用celery4。
此外,还有一点需要注意,在celery4后,定时任务(通过schele调度的会这样,通过crontab调度的会马上执行)会在当前时间再过定时间隔执行第一次任务,比如我这里设置的是60秒的间隔,那么第一次执行add会在我们通过celery
beat
-A
tasks
-l
info启动定时任务后60秒才执行;celery3.1.25则会马上执行该任务

❾ python爬取大量数据(百万级)

当用python爬取大量网页获取想要的数据时,最重要的问题是爬虫中断问题,python这种脚本语言,一中断

进程就会退出,怎么在中断后继续上次爬取的任务就至关重要了。这里就重点剖析这个中断问题。

第一个问题: 简单点的用动态代理池就能解决,在爬取大量数据的时候,为了速度不受影响,建议使用一些缓

存的中间件将有效的代理 ip 缓存起来,并定时更新。这里推荐 github 这个仓库

https://github.com/jhao104/proxy_pool , 它会做ip有效性验证并将 ip 放入 redis ,不过实现过于复杂

了,还用到了 db ,个人觉得最好自己修改一下。困难点的就是它会使用别的请求来进行判断当前的ip是否

是爬虫,当我们过于聚焦我们的爬虫请求而忽略了其他的请求时,可能就会被服务器判定为爬虫,进而这个ip

会被列入黑名单,而且你换了ip一样也会卡死在这里。这种方式呢,简单点就用 selenium + chrome 一个一个

去爬,不过速度太慢了。还是自己去分析吧,也不会过复杂的。

第二个问题: 网络连接超时是大概率会遇到的问题,有可能是在爬取的时候本地网络波动,也有可能是爬

取的服务端对ip做了限制,在爬取到了一定量级的时候做一些延迟的操作,使得一些通用的 http 库超时

urllib )。不过如果是服务端动的手脚一般延迟不会太高,我们只需要人为的设置一个高一点的

timeout 即可(30 秒),最好在爬取开始的时候就对我们要用的爬取库进行一层封装,通用起来才好改

动。

第三个问题: 在解析大量静态页面的时候,有些静态页面的解析规则不一样,所以我们就必须得做好断点

续爬的准备了( PS : 如果简单的忽略错误可能会导致大量数据的丢失,这就不明智了)。那么在调试的过

程中断点续爬有个解决方案,就是生产者和消费者分离,生产者就是产生待爬 url 的爬虫,消费者就是爬取

最终数据的爬虫。最终解析数据就是消费者爬虫了。他们通过消息中间件连接,生产者往消息中间件发送待

爬取的目标信息,消费者从里面取就行了,还间接的实现了个分布式爬取功能。由于现在的消费中间件都有

ack 机制,一个消费者爬取链接失败会导致消息消费失败,进而分配给其他消费者消费。所以消息丢失的

概率极低。不过这里还有个 tips , 消费者的消费超时时间不能太长,会导致消息释放不及时。还有要开启

消息中间价的数据持久化功能,不然消息产生过多而消费不及时会撑爆机器内存。那样就得不偿失了。

第四个问题: 这种情况只能 try except catch 住了,不好解决,如果单独分析的话会耗费点时间。但在

大部分数据 (99%) 都正常的情况下就这条不正常抛弃就行了。主要有了第三个问题的解决方案再出现这

种偶尔中断的问就方便多了。

希望能帮到各位。

阅读全文

与python消息中间件相关的资料

热点内容
dvd光盘存储汉子算法 浏览:757
苹果邮件无法连接服务器地址 浏览:963
phpffmpeg转码 浏览:671
长沙好玩的解压项目 浏览:145
专属学情分析报告是什么app 浏览:564
php工程部署 浏览:833
android全屏透明 浏览:737
阿里云服务器已开通怎么办 浏览:803
光遇为什么登录时服务器已满 浏览:302
PDF分析 浏览:485
h3c光纤全工半全工设置命令 浏览:143
公司法pdf下载 浏览:382
linuxmarkdown 浏览:350
华为手机怎么多选文件夹 浏览:683
如何取消命令方块指令 浏览:350
风翼app为什么进不去了 浏览:778
im4java压缩图片 浏览:362
数据查询网站源码 浏览:150
伊克塞尔文档怎么进行加密 浏览:892
app转账是什么 浏览:163