导航:首页 > 编程语言 > python多进程为什么要用进程池

python多进程为什么要用进程池

发布时间:2022-08-08 10:58:54

1. python多进程中队列不空时阻塞,求解为什么

最近接触一个项目,要在多个虚拟机中运行任务,参考别人之前项目的代码,采用了多进程来处理,于是上网查了查python中的多进程

一、先说说Queue(队列对象)

Queue是python中的标准库,可以直接import 引用,之前学习的时候有听过着名的“先吃先拉”与“后吃先吐”,其实就是这里说的队列,队列的构造的时候可以定义它的容量,别吃撑了,吃多了,就会报错,构造的时候不写或者写个小于1的数则表示无限多

import Queue

q = Queue.Queue(10)

向队列中放值(put)

q.put(‘yang')

q.put(4)

q.put([‘yan','xing'])

在队列中取值get()

默认的队列是先进先出的

>>> q.get()
‘yang'
>>> q.get()
4
>>> q.get()
[‘yan', ‘xing']

当一个队列为空的时候如果再用get取则会堵塞,所以取队列的时候一般是用到

get_nowait()方法,这种方法在向一个空队列取值的时候会抛一个Empty异常

所以更常用的方法是先判断一个队列是否为空,如果不为空则取值

队列中常用的方法

Queue.qsize() 返回队列的大小
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.get([block[, timeout]]) 获取队列,timeout等待时间
Queue.get_nowait() 相当Queue.get(False)
非阻塞 Queue.put(item) 写入队列,timeout等待时间
Queue.put_nowait(item) 相当Queue.put(item, False)

二、multiprocessing中使用子进程概念

from multiprocessing import Process

可以通过Process来构造一个子进程

p = Process(target=fun,args=(args))

再通过p.start()来启动子进程

再通过p.join()方法来使得子进程运行结束后再执行父进程

from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
print 'Run child process %s (%s)...' % (name, os.getpid())

if __name__=='__main__':
print 'Parent process %s.' % os.getpid()
p = Process(target=run_proc, args=('test',))
print 'Process will start.'
p.start()
p.join()
print 'Process end.'

上面的程序运行后的结果其实是按照上图中1,2,3分开进行的,先打印1,3秒后打印2,再3秒后打印3

代码中的p.close()是关掉进程池子,是不再向里面添加进程了,对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

当时也可以是实例pool的时候给它定义一个进程的多少

如果上面的代码中p=Pool(5)那么所有的子进程就可以同时进行

三、多个子进程间的通信

多个子进程间的通信就要采用第一步中说到的Queue,比如有以下的需求,一个子进程向队列中写数据,另外一个进程从队列中取数据,

#coding:gbk

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
for value in ['A', 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value)
time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
while True:
if not q.empty():
value = q.get(True)
print 'Get %s from queue.' % value
time.sleep(random.random())
else:
break

if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 等待pw结束:
pw.join()
# 启动子进程pr,读取:
pr.start()
pr.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
print
print '所有数据都写入并且读完'

四、关于上面代码的几个有趣的问题

if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
p = Pool()
pw = p.apply_async(write,args=(q,))
pr = p.apply_async(read,args=(q,))
p.close()
p.join()

print
print '所有数据都写入并且读完'

如果main函数写成上面的样本,本来我想要的是将会得到一个队列,将其作为参数传入进程池子里的每个子进程,但是却得到

RuntimeError: Queue objects should only be shared between processes through inheritance

的错误,查了下,大意是队列对象不能在父进程与子进程间通信,这个如果想要使用进程池中使用队列则要使用multiprocess的Manager类

if __name__=='__main__':
manager = multiprocessing.Manager()
# 父进程创建Queue,并传给各个子进程:
q = manager.Queue()
p = Pool()
pw = p.apply_async(write,args=(q,))
time.sleep(0.5)
pr = p.apply_async(read,args=(q,))
p.close()
p.join()

print
print '所有数据都写入并且读完'

这样这个队列对象就可以在父进程与子进程间通信,不用池则不需要Manager,以后再扩展multiprocess中的Manager类吧

关于锁的应用,在不同程序间如果有同时对同一个队列操作的时候,为了避免错误,可以在某个函数操作队列的时候给它加把锁,这样在同一个时间内则只能有一个子进程对队列进行操作,锁也要在manager对象中的锁

#coding:gbk

from multiprocessing import Process,Queue,Pool
import multiprocessing
import os, time, random

# 写数据进程执行的代码:
def write(q,lock):
lock.acquire() #加上锁
for value in ['A', 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value)
lock.release() #释放锁

# 读数据进程执行的代码:
def read(q):
while True:
if not q.empty():
value = q.get(False)
print 'Get %s from queue.' % value
time.sleep(random.random())
else:
break

if __name__=='__main__':
manager = multiprocessing.Manager()
# 父进程创建Queue,并传给各个子进程:
q = manager.Queue()
lock = manager.Lock() #初始化一把锁
p = Pool()
pw = p.apply_async(write,args=(q,lock))
pr = p.apply_async(read,args=(q,))
p.close()
p.join()

print
print '所有数据都写入并且读完'

2. Python中的多进程与多线程/分布式该如何使用

Python提供了非常好用的多进程包multiprocessing,你只需要定义一个函数,Python会替你完成其他所有事情。
借助这个包,可以轻松完成从单进程到并发执行的转换。
1、新建单一进程
如果我们新建少量进程,可以如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
p = multiprocessing.Process(target=func, args=("hello", ))
p.start()
p.join()
print "Sub-process done."12345678910111213
2、使用进程池
是的,你没有看错,不是线程池。它可以让你跑满多核CPU,而且使用方法非常简单。
注意要用apply_async,如果落下async,就变成阻塞版本了。
processes=4是最多并发进程数量。
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
for i in xrange(10):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, ))
pool.close()
pool.join()
print "Sub-process(es) done."12345678910111213141516
3、使用Pool,并需要关注结果
更多的时候,我们不仅需要多进程执行,还需要关注每个进程的执行结果,如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
return "done " + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = []
for i in xrange(10):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg, )))
pool.close()
pool.join()
for res in result:
print res.get()
print "Sub-process(es) done."
2014.12.25更新
根据网友评论中的反馈,在Windows下运行有可能崩溃(开启了一大堆新窗口、进程),可以通过如下调用来解决:
multiprocessing.freeze_support()1
附录(自己的脚本):
#!/usr/bin/python
import threading
import subprocess
import datetime
import multiprocessing
def dd_test(round, th):
test_file_arg = 'of=/zbkc/test_mds_crash/1m_%s_%s_{}' %(round, th)
command = "seq 100 | xargs -i dd if=/dev/zero %s bs=1M count=1" %test_file_arg
print command
subprocess.call(command,shell=True,stdout=open('/dev/null','w'),stderr=subprocess.STDOUT)
def mds_stat(round):
p = subprocess.Popen("zbkc mds stat", shell = True, stdout = subprocess.PIPE)
out = p.stdout.readlines()
if out[0].find('active') != -1:
command = "echo '0205pm %s round mds status OK, %s' >> /round_record" %(round, datetime.datetime.now())
command_2 = "time (ls /zbkc/test_mds_crash/) 2>>/round_record"
command_3 = "ls /zbkc/test_mds_crash | wc -l >> /round_record"
subprocess.call(command,shell=True)
subprocess.call(command_2,shell=True)
subprocess.call(command_3,shell=True)
return 1
else:
command = "echo '0205 %s round mds status abnormal, %s, %s' >> /round_record" %(round, out[0], datetime.datetime.now())
subprocess.call(command,shell=True)
return 0
#threads = []
for round in range(1, 1600):
pool = multiprocessing.Pool(processes = 10) #使用进程池
for th in range(10):
# th_name = "thread-" + str(th)
# threads.append(th_name) #添加线程到线程列表
# threading.Thread(target = dd_test, args = (round, th), name = th_name).start() #创建多线程任务
pool.apply_async(dd_test, (round, th))
pool.close()
pool.join()
#等待线程完成
# for t in threads:
# t.join()
if mds_stat(round) == 0:
subprocess.call("zbkc -s",shell=True)
break

3. python怎么实现一个进程

想要充分利用多核CPU资源,Python中大部分情况下都需要使用多进程,Python中提供了multiprocessing这个包实现多进程。multiprocessing支持子进程、进程间的同步与通信,提供了Process、Queue、Pipe、Lock等组件。

开辟子进程

multiprocessing中提供了Process类来生成进程实例

Process([group [, target [, name [, args [, kwargs]]]]])1

4. python多进程为什么一定要

前面讲了为什么Python里推荐用多进程而不是多线程,但是多进程也有其自己的限制:相比线程更加笨重、切换耗时更长,并且在python的多进程下,进程数量不推荐超过CPU核心数(一个进程只有一个GIL,所以一个进程只能跑满一个CPU),因为一个进程占用一个CPU时能充分利用机器的性能,但是进程多了就会出现频繁的进程切换,反而得不偿失。
不过特殊情况(特指IO密集型任务)下,多线程是比多进程好用的。
举个例子:给你200W条url,需要你把每个url对应的页面抓取保存起来,这种时候,单单使用多进程,效果肯定是很差的。为什么呢?
例如每次请求的等待时间是2秒,那么如下(忽略cpu计算时间):
1、单进程+单线程:需要2秒*200W=400W秒==1111.11个小时==46.3天,这个速度明显是不能接受的2、单进程+多线程:例如我们在这个进程中开了10个多线程,比1中能够提升10倍速度,也就是大约4.63天能够完成200W条抓取,请注意,这里的实际执行是:线程1遇见了阻塞,CPU切换到线程2去执行,遇见阻塞又切换到线程3等等,10个线程都阻塞后,这个进程就阻塞了,而直到某个线程阻塞完成后,这个进程才能继续执行,所以速度上提升大约能到10倍(这里忽略了线程切换带来的开销,实际上的提升应该是不能达到10倍的),但是需要考虑的是线程的切换也是有开销的,所以不能无限的启动多线程(开200W个线程肯定是不靠谱的)3、多进程+多线程:这里就厉害了,一般来说也有很多人用这个方法,多进程下,每个进程都能占一个cpu,而多线程从一定程度上绕过了阻塞的等待,所以比单进程下的多线程又更好使了,例如我们开10个进程,每个进程里开20W个线程,执行的速度理论上是比单进程开200W个线程快10倍以上的(为什么是10倍以上而不是10倍,主要是cpu切换200W个线程的消耗肯定比切换20W个进程大得多,考虑到这部分开销,所以是10倍以上)。
还有更好的方法吗?答案是肯定的,它就是:
4、协程,使用它之前我们先讲讲what/why/how(它是什么/为什么用它/怎么使用它)what:
协程是一种用户级的轻量级线程。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:
协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
在并发编程中,协程与线程类似,每个协程表示一个执行单元,有自己的本地数据,与其它协程共享全局数据和其它资源。
why:
目前主流语言基本上都选择了多线程作为并发设施,与线程相关的概念是抢占式多任务(Preemptive multitasking),而与协程相关的是协作式多任务。
不管是进程还是线程,每次阻塞、切换都需要陷入系统调用(system call),先让CPU跑操作系统的调度程序,然后再由调度程序决定该跑哪一个进程(线程)。
而且由于抢占式调度执行顺序无法确定的特点,使用线程时需要非常小心地处理同步问题,而协程完全不存在这个问题(事件驱动和异步程序也有同样的优点)。
因为协程是用户自己来编写调度逻辑的,对CPU来说,协程其实是单线程,所以CPU不用去考虑怎么调度、切换上下文,这就省去了CPU的切换开销,所以协程在一定程度上又好于多线程。
how:
python里面怎么使用协程?答案是使用gevent,使用方法:看这里使用协程,可以不受线程开销的限制,我尝试过一次把20W条url放在单进程的协程里执行,完全没问题。
所以最推荐的方法,是多进程+协程(可以看作是每个进程里都是单线程,而这个单线程是协程化的)多进程+协程下,避开了CPU切换的开销,又能把多个CPU充分利用起来,这种方式对于数据量较大的爬虫还有文件读写之类的效率提升是巨大的。
小例子:
#-*- coding=utf-8 -*-
import requests
from multiprocessing import Process
import gevent
from gevent import monkey; monkey.patch_all()import sys
reload(sys)
sys.setdefaultencoding('utf8')
def fetch(url):
try:
s = requests.Session()
r = s.get(url,timeout=1)#在这里抓取页面
except Exception,e:
print e
return ''
def process_start(tasks):
gevent.joinall(tasks)#使用协程来执行
def task_start(filepath,flag = 100000):#每10W条url启动一个进程with open(filepath,'r') as reader:#从给定的文件中读取urlurl = reader.readline().strip()
task_list = []#这个list用于存放协程任务
i = 0 #计数器,记录添加了多少个url到协程队列while url!='':
i += 1
task_list.append(gevent.spawn(fetch,url,queue))#每次读取出url,将任务添加到协程队列if i == flag:#一定数量的url就启动一个进程并执行p = Process(target=process_start,args=(task_list,))p.start()
task_list = [] #重置协程队列
i = 0 #重置计数器
url = reader.readline().strip()
if task_list not []:#若退出循环后任务队列里还有url剩余p = Process(target=process_start,args=(task_list,))#把剩余的url全都放到最后这个进程来执行p.start()
if __name__ == '__main__':
task_start('./testData.txt')#读取指定文件细心的同学会发现:上面的例子中隐藏了一个问题:进程的数量会随着url数量的增加而不断增加,我们在这里不使用进程池multiprocessing.Pool来控制进程数量的原因是multiprocessing.Pool和gevent有冲突不能同时使用,但是有兴趣的同学可以研究一下gevent.pool这个协程池。
另外还有一个问题:每个进程处理的url是累积的而不是独立的,例如第一个进程会处理10W个,第二个进程会变成20W个,以此类推。最后定位到问题是gevent.joinall()导致的问题,有兴趣的同学可以研究一下为什么会这样。不过这个问题的处理方案是:主进程只负责读取url然后写入到list中,在创建子进程的时候直接把list传给子进程,由子进程自己去构建协程。这样就不会出现累加的问题

5. python 多进程的顺序问题

因为进程池一次只能运行4个进程,0,1,2,3是四个进程同时执行,那么4只能等待。当进程池中任意一个进程结束后,4立即执行,所以在0结束后4开始执行,接着1,2,3陆续结束,4最后结束。

6. 为什么在Python里推荐使用多进程而不是多线程

监控一个信号就起一个线程与进程处理。这样的逻辑是不太合适的。所有的资源都是有限的,如果这样浪费很快会资源管理失控。

常规的做法是起一个线程池,或者是进程池。 使用线程还是进程取决于你处理的信号的类型。如果计算量大,则需要进程池,如果只是设备等待,比如网络数据收发,则线程也勉强够用。

信号过来后处理方法有两种,一种是实时处理,这个没有好办法,可以用“微线程”的办法做,尽量减少处理周期。另外一种是允许少量的延迟。那么通常的做法是用队列。将信号放到线程或者是进程池的消息队列里。然后再由后者分配。

还有一种高效的处理方法,根据信号的值做hash,然后自动分发到不同的CPU或者是服务器。这个就算是大规模并发处理机制。

通常情况下,比如一个WEB服务器,它需要获取一个请求,然后处理响应,可以使用线程模型,或者是进程模型。也是使用典型的池的方法。一个Pool的大于,取决于你的计算 机的计算 能力,内存大小,以及你的并发访问数量。

所要要启用多少个呢?假设你的一个信号的处理周期是1秒,你同时有100个信号进来,那么就需要100个线程或者是进程。

7. python中多进程+协程的使用以及为什么要用它

前面讲了为什么python里推荐用多进程而不是多线程,但是多进程也有其自己的限制:相比线程更加笨重、切换耗时更长,并且在python的多进程下,进程数量不推荐超过CPU核心数(一个进程只有一个GIL,所以一个进程只能跑满一个CPU),因为一个进程占用一个CPU时能充分利用机器的性能,但是进程多了就会出现频繁的进程切换,反而得不偿失。

不过特殊情况(特指IO密集型任务)下,多线程是比多进程好用的。

举个例子:给你200W条url,需要你把每个url对应的页面抓取保存起来,这种时候,单单使用多进程,效果肯定是很差的。为什么呢?

例如每次请求的等待时间是2秒,那么如下(忽略cpu计算时间):

1、单进程+单线程:需要2秒*200W=400W秒==1111.11个小时==46.3天,这个速度明显是不能接受的

2、单进程+多线程:例如我们在这个进程中开了10个多线程,比1中能够提升10倍速度,也就是大约4.63天能够完成200W条抓取,请注意,这里的实际执行是:线程1遇见了阻塞,CPU切换到线程2去执行,遇见阻塞又切换到线程3等等,10个线程都阻塞后,这个进程就阻塞了,而直到某个线程阻塞完成后,这个进程才能继续执行,所以速度上提升大约能到10倍(这里忽略了线程切换带来的开销,实际上的提升应该是不能达到10倍的),但是需要考虑的是线程的切换也是有开销的,所以不能无限的启动多线程(开200W个线程肯定是不靠谱的)

3、多进程+多线程:这里就厉害了,一般来说也有很多人用这个方法,多进程下,每个进程都能占一个cpu,而多线程从一定程度上绕过了阻塞的等待,所以比单进程下的多线程又更好使了,例如我们开10个进程,每个进程里开20W个线程,执行的速度理论上是比单进程开200W个线程快10倍以上的(为什么是10倍以上而不是10倍,主要是cpu切换200W个线程的消耗肯定比切换20W个进程大得多,考虑到这部分开销,所以是10倍以上)。

还有更好的方法吗?答案是肯定的,它就是:

4、协程,使用它之前我们先讲讲what/why/how(它是什么/为什么用它/怎么使用它)

what:

协程是一种用户级的轻量级线程。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

在并发编程中,协程与线程类似,每个协程表示一个执行单元,有自己的本地数据,与其它协程共享全局数据和其它资源。

why:

目前主流语言基本上都选择了多线程作为并发设施,与线程相关的概念是抢占式多任务(Preemptive multitasking),而与协程相关的是协作式多任务。

不管是进程还是线程,每次阻塞、切换都需要陷入系统调用(system call),先让CPU跑操作系统的调度程序,然后再由调度程序决定该跑哪一个进程(线程)。
而且由于抢占式调度执行顺序无法确定的特点,使用线程时需要非常小心地处理同步问题,而协程完全不存在这个问题(事件驱动和异步程序也有同样的优点)。

因为协程是用户自己来编写调度逻辑的,对CPU来说,协程其实是单线程,所以CPU不用去考虑怎么调度、切换上下文,这就省去了CPU的切换开销,所以协程在一定程度上又好于多线程。

how:

python里面怎么使用协程?答案是使用gevent,使用方法:看这里

使用协程,可以不受线程开销的限制,我尝试过一次把20W条url放在单进程的协程里执行,完全没问题。

所以最推荐的方法,是多进程+协程(可以看作是每个进程里都是单线程,而这个单线程是协程化的)

多进程+协程下,避开了CPU切换的开销,又能把多个CPU充分利用起来,这种方式对于数据量较大的爬虫还有文件读写之类的效率提升是巨大的。

小例子:

[python]view plain

8. 请教python高手有关多进程池的问题

和那个没关系,是因为进程执行的是文件级别的,你把print放到if __name__==main下面就解决了,别停他们瞎说

9. python 多线程与多进程问题

监控一个信号就起一个线程与进程处理。这样的逻辑是不太合适的。所有的资源都是有限的,如果这样浪费很快会资源管理失控。

常规的做法是起一个线程池,或者是进程池。 使用线程还是进程取决于你处理的信号的类型。如果计算量大,则需要进程池,如果只是设备等待,比如网络数据收发,则线程也勉强够用。

信号过来后处理方法有两种,一种是实时处理,这个没有好办法,可以用“微线程”的办法做,尽量减少处理周期。另外一种是允许少量的延迟。那么通常的做法是用队列。将信号放到线程或者是进程池的消息队列里。然后再由后者分配。

还有一种高效的处理方法,根据信号的值做hash,然后自动分发到不同的CPU或者是服务器。这个就算是大规模并发处理机制。

通常情况下,比如一个WEB服务器,它需要获取一个请求,然后处理响应,可以使用线程模型,或者是进程模型。也是使用典型的池的方法。一个Pool的大于,取决于你的计算 机的计算 能力,内存大小,以及你的并发访问数量。

所要要启用多少个呢?假设你的一个信号的处理周期是1秒,你同时有100个信号进来,那么就需要100个线程或者是进程。

线程数过多,表面上处理能力在增加,不过延迟也在增加,失败率也会增加。

阅读全文

与python多进程为什么要用进程池相关的资料

热点内容
你们有什么好的解压软件 浏览:605
常州空气压缩机厂家 浏览:239
安卓如何关闭app内弹出的更新提示 浏览:407
e4a写的app怎么装苹果手机 浏览:199
海立压缩机海信系 浏览:208
社保如何在app上合并 浏览:220
小米加密照片后缀 浏览:234
我的世界网易手机怎么创服务器 浏览:978
载入单页源码 浏览:930
阿里云服务器seo 浏览:777
海洋斗什么时候上线安卓 浏览:86
中行app如何查每日汇款限额 浏览:840
输入服务器sn是什么意思 浏览:725
sha1算法java 浏览:90
asp代码压缩 浏览:851
按键压枪源码 浏览:180
福建服务器负载均衡是什么 浏览:697
算法将所有的岛屿连通 浏览:313
51单片机40引脚是什么 浏览:536
手机文件夹大小怎么调节 浏览:309