导航:首页 > 编程语言 > python多进程同步

python多进程同步

发布时间:2023-09-25 20:53:57

python 多进程

基于官方文档:
https://docs.python.org/zh-cn/3/library/multiprocessing.html
日乐购,刚才看到的一个博客,写的都不太对,还是基于官方的比较稳妥
我就是喜欢抄官方的,哈哈

通常我们使用Process实例化一个进程,并调用 他的 start() 方法启动它。
这种方法和 Thread 是一样的。

上图中,我写了 p.join() 所以主进程是 等待 子进程执行完后,才执行 print("运行结束")
否则就是反过来了(这个不一定,看你的语句了,顺序其实是随机的)例如:

主进加个 sleep

所以不加join() ,其实子进程和主进程是各干各的,谁也不等谁。都执行完后,文件运行就结束了

上面我们用了 os.getpid() 和 os.getppid() 获取 当前进程,和父进程的id
下面就讲一下,这两个函数的用法:
os.getpid()
返回当前进程的id
os.getppid()
返回父进程的id。 父进程退出后,unix 返回初始化进程(1)中的一个
windows返回相同的id (可能被其他进程使用了)
这也就解释了,为啥我上面 的程序运行多次, 第一次打印的parentid 都是 14212 了。
而子进程的父级 process id 是调用他的那个进程的 id : 1940

视频笔记:
多进程:使用大致方法:

参考: 进程通信(pipe和queue)

pool.map (函数可以有return 也可以共享内存或queue) 结果直接是个列表

poll.apply_async() (同map,只不过是一个进程,返回结果用 xx.get() 获得)

报错:

参考 : https://blog.csdn.net/xiemanR/article/details/71700531

把 pool = Pool() 放到 if name == " main ": 下面初始化搞定。
结果:

这个肯定有解释的

测试多进程计算效果:
进程池运行:

结果:

普通计算:

我们同样传入 1 2 10 三个参数测试:

其实对比下来开始快了一半的;
我们把循环里的数字去掉一个 0;
单进程:

多进程:

两次测试 单进程/进程池 分别为 0.669 和 0.772 几乎成正比的。
问题 二:
视图:
post 视图里面

Music 类:

直接报错:

写在 类里面也 在函数里用 self.pool 调用也不行,也是相同的错误。

最后 把 pool = Pool 直接写在 search 函数里面,奇迹出现了:

前台也能显示搜索的音乐结果了

总结一点,进程这个东西,最好 写在 直接运行的函数里面,而不是 一个函数跳来跳去。因为最后可能 是在子进程的子进程运行的,这是不许的,会报错。
还有一点,多进程运行的函数对象,不能是 lambda 函数。也许lambda 虚拟,在内存??

使用 pool.map 子进程 函数报错,导致整个 pool 挂了:
参考: https://blog.csdn.net/hedongho/article/details/79139606
主要你要,对函数内部捕获错误,而不能让异常抛出就可以了。

关于map 传多个函数参数
我一开始,就是正常思维,多个参数,搞个元祖,让参数一一对应不就行了:

报错:

参考:
https://blog.csdn.net/qq_15969343/article/details/84672527
普通的 process 当让可以穿多个参数,map 却不知道咋传的。
apply_async 和map 一样,不知道咋传的。

最简单的方法:
使用 starmap 而不是 map

结果:
子进程结束
1.8399453163146973
成功拿到结果了

关于map 和 starmap 不同的地方看源码

关于apply_async() ,我没找到多参数的方法,大不了用 一个迭代的 starmap 实现。哈哈

关于 上面源码里面有 itertools.starmap
itertools 用法参考:
https://docs.python.org/zh-cn/3/library/itertools.html#itertool-functions

有个问题,多进程最好不要使用全部的 cpu , 因为这样可能影响其他任务,所以 在进程池 添加 process 参数 指定,cpu 个数:

上面就是预留了 一个cpu 干其他事的

后面直接使用 Queue 遇到这个问题:

解决:
Manager().Queue() 代替 Queue()

因为 queue.get() 是堵塞型的,所以可以提前判断是不是 空的,以免堵塞进程。比如下面这样:
使用 queue.empty() 空为True

❷ Python入门系列(十二)——GUI+多进程

话说,python做图形界面并不明智,效率并不高。但在某些特殊需求下还是需要我们去使用,所以python拥有多个第三方库用以实现GUI,本章我们使用python基本模块tkinter进行学习,因为需求并不大,所以不做太多拓展。
继续改写上一章的IP查询系统(= =,要玩烂了),首先略改下IpWhere.py以备调用~

然后使用tkinter模块进行图形界面的实现,调用预编译的IpWhere模块 :

额,太丑了,但基本实现我们小小的需求,在以后的py学习中,我们再涉及其他的第三方模块,此处就当是入门了解吧。

十分抱歉把这么重要的内容放在最后,要不是大佬指点,此次学习可能就要错过多进程的问题了。
Unix系统提供了forx,python可借助os模块调用,从而实现多进程,然而windows系统并不具备,所以我们选择python内置的multiprocessing多进程模块进行学习。

首先我们借助直接调用多进程来改写下我们在多线程章节用到的例子!

显然,这么写实在太蠢了,如果我们的任务量巨大,这并不合适。所以我们引入了进程池的概念,使用进程池进行改写:

在此,我们可以看到所有进程是并发执行的,同样,我们在多线程章节就讲过,主进程的结束意味着程序退出,所以我们需要借助join()方法堵塞进程。

我们知道线程共享内存空间,而进程的内存是独立的,同一个进程的线程之间可以直接交流,也就带来了线程同步的苦恼,这个我们在多线程章节已经讲过了;而两个进程想通信,则必须通过一个中间代理来实现,即我们接下来的内容:进程间通信。

进程之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。我们接下来就以Queue的方式进行学习。

Queue.Queue是进程内非阻塞队列,multiprocess.Queue是跨进程通信队列,前者是各自私有,后者是各子进程共有。

还有一个在后者基础上进行封装的multiprocess.Manager.Queue()方法,如果要使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否则会得到一条如下的错误信息: RuntimeError: Queue objects should only be shared between processes through inheritance.

接下来我们就借助进程池来进行多进程操作的改写,感谢大佬一路辅导。

我们可以看到两个子线程先执行,然后一个子线程单独执行,此处有意而为之,让大家更清晰的了解队列的使用。期间有一处我们放弃使用jion()方法堵塞,而是自己写了个循环堵塞,大家根据自己习惯来就好。

话说,真的没人吐槽么?上面的例子从需求上来讲,完全就不需要多线程好不好!emmmm,我们来点实力拓展,写一个有智商的多线程脚本,顺便结合上一节的web来一个综合篇,随便找个现实需求吧!

emmm,比如我们来到当当网买书,搜一下我们想要的书籍,发现!!太多了!!真J2乱!!看不过来!!不想翻页!!直接告诉我哪个便宜、哪个牛逼好不好!!

简单看下这个url:
http://search.dangdang.com/?key=渗透测试&ddsale=1&page_index=2
其中ddsale参数代表当当自营,page_index代表页数,key代表搜索内容,我们本次的变量只有页数。

所以我们构造请求的url为:
'http://search.dangdang.com/?key=渗透测试&ddsale=1&page_index='+str(page)
如果修改的内容不使用str字符串转化,会收到如下报错:
TypeError: can only concatenate str (not "int") to str
然后我们看一下页面内容的分布情况,本次我们关心卖什么书,卖多少钱?

对应的编写我们的正则匹配规则,当然了,有更简便的第三方库可以帮我们处理,但为了更好的形成流程性认识,我们这里依然使用正则。
我们对应我们需要的书籍名称和当前价格匹配如下:
<a title=" (.*?)" ddclick=
<span class="search_now_price">¥(.*?)</span>
那么,思路理清了,我们就开始使用多线程来写我们的小系统~

然后我们去查看一下我们的结果文件~

现在这个小系统具备的功能就是根据用户需要选择要检索的书籍,然后整理下名称和价格,开了10个线程,如果小伙伴pc给力的话可以继续加。简单的异常处理机制和界面交互,基本满足日常所需。

❸ python中多进程+协程的使用以及为什么要用它

前面讲了为什么python里推荐用多进程而不是多线程,但是多进程也有其自己的限制:相比线程更加笨重、切换耗时更长,并且在python的多进程下,进程数量不推荐超过CPU核心数(一个进程只有一个GIL,所以一个进程只能跑满一个CPU),因为一个进程占用一个CPU时能充分利用机器的性能,但是进程多了就会出现频繁的进程切换,反而得不偿失。

不过特殊情况(特指IO密集型任务)下,多线程是比多进程好用的。

举个例子:给你200W条url,需要你把每个url对应的页面抓取保存起来,这种时候,单单使用多进程,效果肯定是很差的。为什么呢?

例如每次请求的等待时间是2秒,那么如下(忽略cpu计算时间):

1、单进程+单线程:需要2秒*200W=400W秒==1111.11个小时==46.3天,这个速度明显是不能接受的

2、单进程+多线程:例如我们在这个进程中开了10个多线程,比1中能够提升10倍速度,也就是大约4.63天能够完成200W条抓取,请注意,这里的实际执行是:线程1遇见了阻塞,CPU切换到线程2去执行,遇见阻塞又切换到线程3等等,10个线程都阻塞后,这个进程就阻塞了,而直到某个线程阻塞完成后,这个进程才能继续执行,所以速度上提升大约能到10倍(这里忽略了线程切换带来的开销,实际上的提升应该是不能达到10倍的),但是需要考虑的是线程的切换也是有开销的,所以不能无限的启动多线程(开200W个线程肯定是不靠谱的)

3、多进程+多线程:这里就厉害了,一般来说也有很多人用这个方法,多进程下,每个进程都能占一个cpu,而多线程从一定程度上绕过了阻塞的等待,所以比单进程下的多线程又更好使了,例如我们开10个进程,每个进程里开20W个线程,执行的速度理论上是比单进程开200W个线程快10倍以上的(为什么是10倍以上而不是10倍,主要是cpu切换200W个线程的消耗肯定比切换20W个进程大得多,考虑到这部分开销,所以是10倍以上)。

还有更好的方法吗?答案是肯定的,它就是:

4、协程,使用它之前我们先讲讲what/why/how(它是什么/为什么用它/怎么使用它)

what:

协程是一种用户级的轻量级线程。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

在并发编程中,协程与线程类似,每个协程表示一个执行单元,有自己的本地数据,与其它协程共享全局数据和其它资源。

why:

目前主流语言基本上都选择了多线程作为并发设施,与线程相关的概念是抢占式多任务(Preemptive multitasking),而与协程相关的是协作式多任务。

不管是进程还是线程,每次阻塞、切换都需要陷入系统调用(system call),先让CPU跑操作系统的调度程序,然后再由调度程序决定该跑哪一个进程(线程)。
而且由于抢占式调度执行顺序无法确定的特点,使用线程时需要非常小心地处理同步问题,而协程完全不存在这个问题(事件驱动和异步程序也有同样的优点)。

因为协程是用户自己来编写调度逻辑的,对CPU来说,协程其实是单线程,所以CPU不用去考虑怎么调度、切换上下文,这就省去了CPU的切换开销,所以协程在一定程度上又好于多线程。

how:

python里面怎么使用协程?答案是使用gevent,使用方法:看这里

使用协程,可以不受线程开销的限制,我尝试过一次把20W条url放在单进程的协程里执行,完全没问题。

所以最推荐的方法,是多进程+协程(可以看作是每个进程里都是单线程,而这个单线程是协程化的)

多进程+协程下,避开了CPU切换的开销,又能把多个CPU充分利用起来,这种方式对于数据量较大的爬虫还有文件读写之类的效率提升是巨大的。

小例子:

[python]view plain

❹ Python中的多进程与多线程/分布式该如何使用

Python提供了非常好用的多进程包multiprocessing,你只需要定义一个函数,Python会替你完成其他所有事情。
借助这个包,可以轻松完成从单进程到并发执行的转换。
1、新建单一进程
如果我们新建少量进程,可以如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
p = multiprocessing.Process(target=func, args=("hello", ))
p.start()
p.join()
print "Sub-process done."12345678910111213
2、使用进程池
是的,你没有看错,不是线程池。它可以让你跑满多核CPU,而且使用方法非常简单。
注意要用apply_async,如果落下async,就变成阻塞版本了。
processes=4是最多并发进程数量。
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
for i in xrange(10):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, ))
pool.close()
pool.join()
print "Sub-process(es) done."12345678910111213141516
3、使用Pool,并需要关注结果
更多的时候,我们不仅需要多进程执行,还需要关注每个进程的执行结果,如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
return "done " + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = []
for i in xrange(10):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg, )))
pool.close()
pool.join()
for res in result:
print res.get()
print "Sub-process(es) done."
2014.12.25更新
根据网友评论中的反馈,在Windows下运行有可能崩溃(开启了一大堆新窗口、进程),可以通过如下调用来解决:
multiprocessing.freeze_support()1
附录(自己的脚本):
#!/usr/bin/python
import threading
import subprocess
import datetime
import multiprocessing
def dd_test(round, th):
test_file_arg = 'of=/zbkc/test_mds_crash/1m_%s_%s_{}' %(round, th)
command = "seq 100 | xargs -i dd if=/dev/zero %s bs=1M count=1" %test_file_arg
print command
subprocess.call(command,shell=True,stdout=open('/dev/null','w'),stderr=subprocess.STDOUT)
def mds_stat(round):
p = subprocess.Popen("zbkc mds stat", shell = True, stdout = subprocess.PIPE)
out = p.stdout.readlines()
if out[0].find('active') != -1:
command = "echo '0205pm %s round mds status OK, %s' >> /round_record" %(round, datetime.datetime.now())
command_2 = "time (ls /zbkc/test_mds_crash/) 2>>/round_record"
command_3 = "ls /zbkc/test_mds_crash | wc -l >> /round_record"
subprocess.call(command,shell=True)
subprocess.call(command_2,shell=True)
subprocess.call(command_3,shell=True)
return 1
else:
command = "echo '0205 %s round mds status abnormal, %s, %s' >> /round_record" %(round, out[0], datetime.datetime.now())
subprocess.call(command,shell=True)
return 0
#threads = []
for round in range(1, 1600):
pool = multiprocessing.Pool(processes = 10) #使用进程池
for th in range(10):
# th_name = "thread-" + str(th)
# threads.append(th_name) #添加线程到线程列表
# threading.Thread(target = dd_test, args = (round, th), name = th_name).start() #创建多线程任务
pool.apply_async(dd_test, (round, th))
pool.close()
pool.join()
#等待线程完成
# for t in threads:
# t.join()
if mds_stat(round) == 0:
subprocess.call("zbkc -s",shell=True)
break

❺ python中的进程-实战部分

如果想了解进程 可以先看一下这一篇 python中的进程-理论部分

python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。
multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。

multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

创建进程的类

参数介绍:

group参数未使用,值始终为None

target表示调用对象,即子进程要执行的任务

args表示调用对象的位置参数元组,args=(1,2,'tiga',)

kwargs表示调用对象的字典,kwargs={'name':'tiga','age':18}

name为子进程的名称

方法介绍:

p.start():启动进程,并调用该子进程中的p.run()
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法

p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
p.is_alive():如果p仍然运行,返回True

p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

属性介绍:

注意:在windows中Process()必须放到# if __name__ == '__main__':下

创建并开启子进程的两种方式

方法一:


方法二:

有了join,程序不就是串行了吗???

terminate与is_alive

name与pid

阅读全文

与python多进程同步相关的资料

热点内容
数字还可以怎样加密 浏览:114
为什么安卓没白鸟 浏览:235
程序员投行 浏览:313
java多线程读取文件 浏览:145
香港外贸服务器有什么好处 浏览:614
邓伦参加密室大逃脱结果变成团宠 浏览:847
购买文件服务器怎么选择 浏览:720
空调压缩机高压报警 浏览:498
u盘数控程序放哪个文件夹 浏览:853
python模拟微信登录其他APP 浏览:301
绑扎钢筋加密区规范 浏览:671
怎么更换手机壁纸安卓 浏览:808
闲鱼app卖手机怎么走验机 浏览:821
安卓三个按键音怎么关闭 浏览:64
esp8266手机app源码 浏览:713
服务器如何建立多个站点 浏览:151
加密狗可以在笔记本上做账吗 浏览:888
学生云服务器推荐 浏览:509
android银行卡快捷支付 浏览:828
海口手机直播系统源码 浏览:416