⑴ python 怎么做分布式for循环
用Python的for循环实现等差序信亏列相加,例如此坦消(1+2+3+4+5+...+20)这样的算法森知,代码如下: import mathtemp = 0for x in range(1,20): temp += x break else: print temp #输入结果:210
⑵ 如何用 Python 构建一个简单的分布式系统
分布式爬虫概览
何谓分布式爬虫?
通俗的讲,分布式爬虫就是多台机器多个
spider
对多个
url
的同时处理问题,分布式的方式可以极大提高程序的抓取效率。
构建分布式爬虫通畅需要考虑的问题
(1)如何能保证多台机器同时抓取同一个URL?
(2)如果某个节点挂掉,会不会影响其它节点,任务如何继续?
(3)既然是分布式,如何保证架构的可伸缩性和可扩展性?不同优先级的抓取任务如何进行资源分配和调度?
基于上述问题,我选择使用celery作为分布式任务调度工具,是分布式爬虫中任务和资源调度的核心模块。它会把所有任务都通过消息队列发送给各个分布式节点进行执行,所以可以很好的保证url不会被重复抓取;它在检测到worker挂掉的情况下,会尝试向其他的worker重新发送这个任务信息,这样第二个问题也可以得到解决;celery自带任务路由,我们可以根据实际情况在不同的节点上运行不同的抓取任务(在实战篇我会讲到)。本文主要就是带大家了解一下celery的方方面面(有celery相关经验的同学和大牛可以直接跳过了)
Celery知识储备
celery基础讲解
按celery官网的介绍来说
Celery
是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。它是一个专注于实时处理的任务队列,同时也支持任务调度。
下面几个关于celery的核心知识点
broker:翻译过来叫做中间人。它是一个消息传输的中间件,可以理解为一个邮箱。每当应用程序调用celery的异步任务的时候,会向broker传递消息,而后celery的worker将会取到消息,执行相应程序。这其实就是消费者和生产者之间的桥梁。
backend:
通常程序发送的消息,发完就完了,可能都不知道对方时候接受了。为此,celery实现了一个backend,用于存储这些消息以及celery执行的一些消息和结果。
worker:
Celery类的实例,作用就是执行各种任务。注意在celery3.1.25后windows是不支持celery
worker的!
procer:
发送任务,将其传递给broker
beat:
celery实现的定时任务。可以将其理解为一个procer,因为它也是通过网络调用定时将任务发送给worker执行。注意在windows上celery是不支持定时任务的!
下面是关于celery的架构示意图,结合上面文字的话应该会更好理解
由于celery只是任务队列,而不是真正意义上的消息队列,它自身不具有存储数据的功能,所以broker和backend需要通过第三方工具来存储信息,celery官方推荐的是
RabbitMQ和Redis,另外mongodb等也可以作为broker或者backend,可能不会很稳定,我们这里选择Redis作为broker兼backend。
实际例子
先安装celery
pip
install
celery
我们以官网给出的例子来做说明,并对其进行扩展。首先在项目根目录下,这里我新建一个项目叫做celerystudy,然后切换到该项目目录下,新建文件tasks.py,然后在其中输入下面代码
这里我详细讲一下代码:我们先通过app=Celery()来实例化一个celery对象,在这个过程中,我们指定了它的broker,是redis的db
2,也指定了它的backend,是redis的db3,
broker和backend的连接形式大概是这样
redis://:password@hostname:port/db_number
然后定义了一个add函数,重点是@app.task,它的作用在我看来就是将add()
注册为一个类似服务的东西,本来只能通过本地调用的函数被它装饰后,就可以通过网络来调用。这个tasks.py中的app就是一个worker。它可以有很多任务,比如这里的任务函数add。我们再通过在命令行切换到项目根目录,执行
celery
-A
tasks
worker
-l
info
启动成功后就是下图所示的样子
这里我说一下各个参数的意思,-A指定的是app(即Celery实例)所在的文件模块,我们的app是放在tasks.py中,所以这里是
tasks;worker表示当前以worker的方式运行,难道还有别的方式?对的,比如运行定时任务就不用指定worker这个关键字;
-l
info表示该worker节点的日志等级是info,更多关于启动worker的参数(比如-c、-Q等常用的)请使用
celery
worker
--help
进行查看
将worker启动起来后,我们就可以通过网络来调用add函数了。我们在后面的分布式爬虫构建中也是采用这种方式分发和消费url的。在命令行先切换到项目根目录,然后打开python交互端
from
tasks
import
addrs
=
add.delay(2,
2)
这里的add.delay就是通过网络调用将任务发送给add所在的worker执行,这个时候我们可以在worker的界面看到接收的任务和计算的结果。
这里是异步调用,如果我们需要返回的结果,那么要等rs的ready状态true才行。这里add看不出效果,不过试想一下,如果我们是调用的比较占时间的io任务,那么异步任务就比较有价值了
上面讲的是从Python交互终端中调用add函数,如果我们要从另外一个py文件调用呢?除了通过import然后add.delay()这种方式,我们还可以通过send_task()这种方式,我们在项目根目录另外新建一个py文件叫做
excute_tasks.py,在其中写下如下的代码
from
tasks
import
addif
__name__
==
'__main__':
add.delay(5,
10)
这时候可以在celery的worker界面看到执行的结果
此外,我们还可以通过send_task()来调用,将excute_tasks.py改成这样
这种方式也是可以的。send_task()还可能接收到为注册(即通过@app.task装饰)的任务,这个时候worker会忽略这个消息
定时任务
上面部分讲了怎么启动worker和调用worker的相关函数,这里再讲一下celery的定时任务。
爬虫由于其特殊性,可能需要定时做增量抓取,也可能需要定时做模拟登陆,以防止cookie过期,而celery恰恰就实现了定时任务的功能。在上述基础上,我们将tasks.py文件改成如下内容
然后先通过ctrl+c停掉前一个worker,因为我们代码改了,需要重启worker才会生效。我们再次以celery
-A
tasks
worker
-l
info这个命令开启worker。
这个时候我们只是开启了worker,如果要让worker执行任务,那么还需要通过beat给它定时发送,我们再开一个命令行,切换到项目根目录,通过
这样就表示定时任务已经开始运行了。
眼尖的同学可能看到我这里celery的版本是3.1.25,这是因为celery支持的windows最高版本是3.1.25。由于我的分布式微博爬虫的worker也同时部署在了windows上,所以我选择了使用
3.1.25。如果全是linux系统,建议使用celery4。
此外,还有一点需要注意,在celery4后,定时任务(通过schele调度的会这样,通过crontab调度的会马上执行)会在当前时间再过定时间隔执行第一次任务,比如我这里设置的是60秒的间隔,那么第一次执行add会在我们通过celery
beat
-A
tasks
-l
info启动定时任务后60秒才执行;celery3.1.25则会马上执行该任务
⑶ 如何用 python 构建一个简单的分布式系统
从GitHub中整理出的15个最受欢迎的Python开源框架。这些框架包括事件I/O,OLAP,Web开发,高性能网络通信,测试,爬虫等。
Django: Python Web应用开发框架
Django 应该是最出名的Python框架,GAE甚至Erlang都有框架受它影响。Django是走大而全的方向,它最出名的是其全自动化的管理后台:只需要使用起ORM,做简单的对象定义,它就能自动生成数据库结构、以及全功能的管理后台。
Diesel:基于Greenlet的事件I/O框架
Diesel提供一个整洁的API来编写网络客户端和服务器。支持TCP和UDP。
Flask:一个用Python编写的轻量级Web应用框架
Flask是一个使用Python编写的轻量级Web应用框架。基于Werkzeug WSGI工具箱和Jinja2
模板引擎。Flask也被称为“microframework”,因为它使用简单的核心,用extension增加其他功能。Flask没有默认使用的数
据库、窗体验证工具。
Cubes:轻量级Python OLAP框架
Cubes是一个轻量级Python框架,包含OLAP、多维数据分析和浏览聚合数据(aggregated data)等工具。
Kartograph.py:创造矢量地图的轻量级Python框架
Kartograph是一个Python库,用来为ESRI生成SVG地图。Kartograph.py目前仍处于beta阶段,你可以在virtualenv环境下来测试。
Pulsar:Python的事件驱动并发框架
Pulsar是一个事件驱动的并发框架,有了pulsar,你可以写出在不同进程或线程中运行一个或多个活动的异步服务器。
Web2py:全栈式Web框架
Web2py是一个为Python语言提供的全功能Web应用框架,旨在敏捷快速的开发Web应用,具有快速、安全以及可移植的数据库驱动的应用,兼容Google App Engine。
Falcon:构建云API和网络应用后端的高性能Python框架
Falcon是一个构建云API的高性能Python框架,它鼓励使用REST架构风格,尽可能以最少的力气做最多的事情。
Dpark:Python版的Spark
DPark是Spark的Python克隆,是一个Python实现的分布式计算框架,可以非常方便地实现大规模数据处理和迭代计算。DPark由豆瓣实现,目前豆瓣内部的绝大多数数据分析都使用DPark完成,正日趋完善。
Buildbot:基于Python的持续集成测试框架
Buildbot是一个开源框架,可以自动化软件构建、测试和发布等过程。每当代码有改变,服务器要求不同平台上的客户端立即进行代码构建和测试,收集并报告不同平台的构建和测试结果。
Zerorpc:基于ZeroMQ的高性能分布式RPC框架
Zerorpc是一个基于ZeroMQ和MessagePack开发的远程过程调用协议(RPC)实现。和 Zerorpc 一起使用的 Service API 被称为 zeroservice。Zerorpc 可以通过编程或命令行方式调用。
Bottle: 微型Python Web框架
Bottle是一个简单高效的遵循WSGI的微型python Web框架。说微型,是因为它只有一个文件,除Python标准库外,它不依赖于任何第三方模块。
Tornado:异步非阻塞IO的Python Web框架
Tornado的全称是Torado Web Server,从名字上看就可知道它可以用作Web服务器,但同时它也是一个Python Web的开发框架。最初是在FriendFeed公司的网站上使用,FaceBook收购了之后便开源了出来。
webpy: 轻量级的Python Web框架
webpy的设计理念力求精简(Keep it simple and powerful),源码很简短,只提供一个框架所必须的东西,不依赖大量的第三方模块,它没有URL路由、没有模板也没有数据库的访问。
Scrapy:Python的爬虫框架
Scrapy是一个使用Python编写的,轻量级的,简单轻巧,并且使用起来非常的方便。
⑷ 哪些分布式文件系统是由Python编写的呢
我知道分布式文件系统完全用Python 写的只有openstack 的swift。
其他还有一些不知名的分布式文件系统用python 写的如:
NCFS(基于多个云存储的分布式文件系统)
一般考虑性能都不会采用python 作为分布式文件系统的开发语言
⑸ 求教Python 如何优雅的分布式调用 phantomJS
这种东西在大批量抓去时主要有下面几个量变引发质变的挑战:
1. 出口IP数量,主要是考虑防止被封禁,带宽反而不是大问题,这个问题可以通过搭建NAT出口集群,或者单机多IP的方式实现
2. 本地端口号耗尽,由于爬虫是服务端编程不太常见的主动发起连接的应用,在普通只有一个IP绑定的机器上会受到65535的限制(一般在50000多就会受到限制)
3. 大容量存储的需求,一般都是通过开源或者自己研发的分布式存储系统来实现,像谷歌(GFS)和网络(百灵)都是自研,这里就不展开说了
4. 动态网页的支持,像京东这种网站,内容都是通
⑹ python基础都有哪些内容呢
阶段一:Python开发基础
Python全栈开发与人工智能之Python开发基础知识学习内容包括:Python基础语法、数据类型、字符编码、文件操作、函数、装饰器、迭代器、内置方法、常用模块等。
阶段二:Python高级编程和数据库开发
Python全栈开发与人工智能之Python高级编程和数据库开发知识学习内容包括:面向对象开发、Socket网络编程、线程、进程、队列、IO多路模型、Mysql数据库开发等。
阶段三:前端开发
Python全栈开发与人工智能之前端开发知识学习内容包括:Html、CSS、JavaScript开发、Jquery&bootstrap开发、前端框架VUE开发等。
阶段四:WEB框架开发
Python全栈开发与人工智能之WEB框架开发学习内容包括:Django框架基础、Django框架进阶、BBS+Blog实战项目开发、缓存和队列中间件、Flask框架学习、Tornado框架学习、Restful API等。
阶段五:爬虫开发
Python全栈开发与人工智能之爬虫开发学习内容包括:爬虫开发实战。
阶段六:全栈项目实战
Python全栈开发与人工智能之全栈项目实战学习内容包括:企业应用工具学习、CRM客户关系管理系统开发、路飞学城在线教育平台开发等。
阶段七:数据分析
Python全栈开发与人工智能之数据分析学习内容包括:金融量化分析。
阶段八:人工智能
Python全栈开发与人工智能之人工智能学习内容包括:机器学习、图形识别、无人机开发、无人驾驶等。
阶段九:自动化运维&开发
Python全栈开发与人工智能之自动化运维&开发学习内容包括:CMDB资产管理系统开发、IT审计+主机管理系统开发、分布式主机监控系统开发等。
阶段十:高并发语言GO开发
Python全栈开发与人工智能之高并发语言GO开发学习内容包括:GO语言基础、数据类型与文件IO操作、函数和面向对象、并发编程等。
这是我校课程大纲,不妨试试!
⑺ Python分布式进程中你会遇到的坑
写在前面
小惊大怪
你是不是在用Python3或者在windows系统上编程?最重要的是你对进程和线程不是很清楚?那么恭喜你,在python分布式进程中,会有坑等着你去挖。。。(hahahaha,此处允许我吓唬一下你)开玩笑的啦,不过,如果你知道序列中不支持匿名函数,那这个坑就和你say byebye了。好了话不多数,直接进入正题。
分布式进程
正如大家所知道的Process比Thread更稳定,而且Process可以分布到多台机器上,而Thread最多只能分布到同一陆坦咐台机器的多个CPU上。Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。
代码记录
举个例子
如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上,这应该怎么用分布式进程来实现呢?你已经知道了原有的Queue可以继续使用,而且通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程来访问Queue了。好,那我们就这么干!
写个task_master.py
我们先看服务进程。服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务。
请注意,当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。然后,在另一台机器上启动任务进程(本机上启动也可以)
写个task_worker.py
任务进程要通过网络连接到服务进程,所以要指定服务进程的IP。
运行结果
现在,可信没以试试分布式进程的工作效果了。先启动task_master.py服务进程:
task_master.py进程发送完任务后,开始等待result队列的结果。现在启动task_worker.py进程:
看到没,结果都出错了,我们好好分析一下到底哪出错了。。。
错误分析
在task_master.py的报错提示中,我们知道它说lambda错误,这是因为序列化不支持匿名函数,所以我们得修改代码,重新对queue用QueueManager进行封装放到网络中。
其中task_queue和result_queue是两个队列,分别存放任务和结果。它们用来进行进程间通信,交换对象。
因为是分布式的环境,放入queue中的数据需要等待Workers机器运算处理后再进行读取,这样就需要对queue用QueueManager进行封装放到网络中,这是通过上面的2行代码来实现的。我们给return_task_queue的网络调用接口取了一个名早纯get_task_queue,而return_result_queue的名字是get_result_queue,方便区分对哪个queue进行操作。task.put(n)即是对task_queue进行写入数据,相当于分配任务。而result.get()即是等待workers机器处理后返回的结果。
值得注意 在windows系统中你必须要写IP地址,而其他操作系统比如linux操作系统则就不要了。
修改后的代码
在task_master.py中修改如下:
在task_worker.py中修改如下:
先运行task_master.py,然后再运行task_worker.py
(1)task_master.py运行结果如下
(2)task_worker.py运行结果如下
知识补充
这个简单的Master/Worker模型有什么用?其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,比如把计算n*n的代码换成发送邮件,就实现了邮件队列的异步发送。
Queue对象存储在哪?注意到task_worker.py中根本没有创建Queue的代码,所以,Queue对象存储在task_master.py进程中:
而Queue之所以能通过网络访问,就是通过QueueManager实现的。由于QueueManager管理的不止一个Queue,所以,要给每个Queue的网络调用接口起个名字,比如get_task_queue。task_worker这里的QueueManager注册的名字必须和task_manager中的一样。对比上面的例子,可以看出Queue对象从另一个进程通过网络传递了过来。只不过这里的传递和网络通信由QueueManager完成。
authkey有什么用?这是为了保证两台机器正常通信,不被其他机器恶意干扰。如果task_worker.py的authkey和task_master.py的authkey不一致,肯定连接不上。
⑻ 求python分布式爬虫教学视频
链接: https://pan..com/s/1DSW8IPOuu9XCAyKGy1VZmw
python爬虫课程以扒厅Python语言为基础描述了网络爬虫的基础知识,用大简茄量实际案例及代码,介绍了编写网络爬虫所需要的相关知识要点及项目实践的相关技巧春咐隐。
⑼ Python中的多进程与多线程/分布式该如何使用
Python提供了非常好用的多进程包multiprocessing,你只需要定义一个函数,Python会替你完成其他所有事情。
借助这个包,可以轻松完成从单进程到并发执行的转换。
1、新建单一进程
如果我们新建少量进程,可以如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
p = multiprocessing.Process(target=func, args=("hello", ))
p.start()
p.join()
print "Sub-process done."12345678910111213
2、使用进程池
是的,你没有看错,不是线程池。它可以让你跑满多核CPU,而且使用方法非常简单。
注意要用apply_async,如果落下async,就变成阻塞版本了。
processes=4是最多并发进程数量。
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
for i in xrange(10):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, ))
pool.close()
pool.join()
print "Sub-process(es) done."12345678910111213141516
3、使用Pool,并需要关注结果
更多的时候,我们不仅需要多进程执行,还需要关注每个进程的执行结果,如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
return "done " + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = []
for i in xrange(10):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg, )))
pool.close()
pool.join()
for res in result:
print res.get()
print "Sub-process(es) done."
2014.12.25更新
根据网友评论中的反馈,在Windows下运行有可能崩溃(开启了一大堆新窗口、进程),可以通过如下调用来解决:
multiprocessing.freeze_support()1
附录(自己的脚本):
#!/usr/bin/python
import threading
import subprocess
import datetime
import multiprocessing
def dd_test(round, th):
test_file_arg = 'of=/zbkc/test_mds_crash/1m_%s_%s_{}' %(round, th)
command = "seq 100 | xargs -i dd if=/dev/zero %s bs=1M count=1" %test_file_arg
print command
subprocess.call(command,shell=True,stdout=open('/dev/null','w'),stderr=subprocess.STDOUT)
def mds_stat(round):
p = subprocess.Popen("zbkc mds stat", shell = True, stdout = subprocess.PIPE)
out = p.stdout.readlines()
if out[0].find('active') != -1:
command = "echo '0205pm %s round mds status OK, %s' >> /round_record" %(round, datetime.datetime.now())
command_2 = "time (ls /zbkc/test_mds_crash/) 2>>/round_record"
command_3 = "ls /zbkc/test_mds_crash | wc -l >> /round_record"
subprocess.call(command,shell=True)
subprocess.call(command_2,shell=True)
subprocess.call(command_3,shell=True)
return 1
else:
command = "echo '0205 %s round mds status abnormal, %s, %s' >> /round_record" %(round, out[0], datetime.datetime.now())
subprocess.call(command,shell=True)
return 0
#threads = []
for round in range(1, 1600):
pool = multiprocessing.Pool(processes = 10) #使用进程池
for th in range(10):
# th_name = "thread-" + str(th)
# threads.append(th_name) #添加线程到线程列表
# threading.Thread(target = dd_test, args = (round, th), name = th_name).start() #创建多线程任务
pool.apply_async(dd_test, (round, th))
pool.close()
pool.join()
#等待线程完成
# for t in threads:
# t.join()
if mds_stat(round) == 0:
subprocess.call("zbkc -s",shell=True)
break
⑽ python面试之分布式
主要用于分散压力,所以分布式的服务都是部署在不同的服务器上的,再将服务做集群
根据“分层”的思想进行拆分。
例如,可以将一个项目根据“三层架构” 拆分
然后再分开部署 :
根据业务进行拆分。
例如,可以根据业务逻辑,将“电商项目”拆分成 “订单项目”、“用户项目”和“秒杀项目” 。显然这三个拆分后的项目,仍然可以作为独立的项目使用。像这种拆分的方法,就成为垂直拆分
主要用于分散能力,主要是将服务的颗粒度尽量细化,且自成一脉,压力这块并不是其关注的点,所以多个微服务是可以部署在同一台服务器上的
微服务可以理解为一种 非常细粒度的垂直拆分 。例如,以上“订单项目”本来就是垂直拆分后的子项目,但实际上“订单项目”还能进一步拆分为“购物项目”、“结算项目”和“售后项目”,如图
现在看图中的“订单项目”,它完全可以作为一个分布式项目的组成元素,但就不适合作为微服务的组成元素了(因为它还能再拆,而微服务应该是不能再拆的“微小”服务,类似于“原子性”)
分布式服务需要提供给别的分布式服务去调用,单独拆出来 未必外部可用
微服务自成一脉,可以系统内部调用,也可以单独提供服务
为什么需要用分布式锁,见下图
变量A存在三个服务器内存中(这个变量A主要体现是在一个类中的一个成员变量,是一个有状态的对象),如果不加任何控制的话,变量A同时都会在分配一块内存,三个请求发过来同时对这个变量操作,显然结果是不对的!即使不是同时发过来,三个请求分别操作三个不同内存区域的数据,变量A之间不存在共享,也不具有可见性,处理的结果也是不对的。
分布式锁应该具备哪些条件:
1、在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;
2、高可用的获取锁与释放锁;
3、高性能的获取锁与释放锁;
4、具备可重入特性;
5、具备锁失效机制,防止死锁;
6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败
Redis性能高
命令简单,实现方便
使用setnx加锁,key为锁名,value随意不重复就行(一般用uuid)
给锁添加expire时间,超过该时间redis过期(即自动释放锁)
设置获取锁的超时时间,若超过时间,则放弃获取锁
通过锁名获取锁值
比较锁值和当前uuid是否一致,一致则释放锁(通过delete命令删除redis键值对)
2PC:two phase commit protocol,二阶段提交协议,是一种强一致性设计。
同步阻塞(导致长久的资源锁定) ,只有第一阶段全部正常完成(返回失败,回字返回超时都会返回 “准备失败” ),才会进入第二阶段
因为协调者可能会在任意一个时间点(发送准备命令之前,发送准备命令之后,发送回滚事务命令之前,发送回滚事务命令之后,发送提交事务命令之前,发送提交事务命令之后)故障,导致资源阻塞。
T:try,指的是预留,即资源的预留和锁定,注意是预留
C:confirm,指的是确认操作,这一步其实就是真正的执行了
C:cancel,指的是撤销操作,可以理解为把预留阶段的动作撤销了
从思想上看和 2PC 差不多,都是先试探性的执行,如果都可以那就真正的执行,如果不行就回滚。
适用于对实时性要求没那么高的业务场景,如:短信通知