导航:首页 > 编程语言 > python多进程读取文件

python多进程读取文件

发布时间:2023-05-08 05:28:25

python中多进程+协程的使用以及为什么要用它

前面讲了为什么python里推荐用多进程而不是多线程,但是多进程也有其自己的限制:相比线程更加笨重、切换耗时更长,并且在python的多进程下,进程数量不推荐超过CPU核心数(一个进程只有一个GIL,所以一个进程只能跑满一个CPU),因为一个进程占用一个CPU时能充分利用机器的性能,但是进程多了就会出现频繁的进程切换,反而得不偿失。

不过特殊情况(特指IO密集型任务)下,多线程是比多进程好用的。

举个例子:给你200W条url,需要你把每个url对应的页面抓取保存起来,这种时候,单单使用多进程,效果肯定是很差的。为什么呢?

例如每次请求的等待时间是2秒,那么如下(忽略cpu计算时间):

1、单进程+单线程:需要2秒*200W=400W秒==1111.11个小时==46.3天,这个速度明显是不能接受的

2、单进程+多线程:例如我们在这个进程中开了10个多线程,比1中能够提升10倍速度,也就是大约4.63天能够完成200W条抓取,请注意,这里的实际执行是:线程1遇见了阻塞,CPU切换到线程2去执行,遇见阻塞又切换到线程3等等,10个线程都阻塞后,这个进程就阻塞了,而直到某个线程阻塞完成后,这个进程才能继续执行,所以速度上提升大约能到10倍(这里忽略了线程切换带来的开销,实际上的提升应该是不能达到10倍的),但是需要考虑的是线程的切换也是有开销的,所以不能无限的启动多线程(开200W个线程肯定是不靠谱的)

3、多进程+多线程:这里就厉害了,一般来说也有很多人用这个方法,多进程下,每个进程都能占一个cpu,而多线程从一定程度上绕过了阻塞的等待,所以比单进程下的多线程又更好使了,例如我们开10个进程,每个进程里开20W个线程,执行的速度理论上是比单进程开200W个线程快10倍以上的(为什么是10倍以上而不是10倍,主要是cpu切换200W个线程的消耗肯定比切换20W个进程大得多,考虑到这部分开销,所以是10倍以上)。

还有更好的方法吗?答案是肯定的,它就是:

4、协程,使用它之前我们先讲讲what/why/how(它是什么/为什么用它/怎么使用它)

what:

协程是一种用户级的轻量级线程。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

在并发编程中,协程与线程类似,每个协程表示一个执行单元,有自己的本地数据,与其它协程共享全局数据和其它资源。

why:

目前主流语言基本上都选择了多线程作为并发设施,与线程相关的概念是抢占式多任务(Preemptive multitasking),而与协程相关的是协作式多任务。

不管是进程还是线程,每次阻塞、切换都需要陷入系统调用(system call),先让CPU跑操作系统的调度程序,然后再由调度程序决定该跑哪一个进程(线程)。
而且由于抢占式调度执行顺序无法确定的特点,使用线程时需要非常小心地处理同步问题,而协程完全不存在这个问题(事件驱动和异步程序也有同样的优点)。

因为协程是用户自己来编写调度逻辑的,对CPU来说,协程其实是单线程,所以CPU不用去考虑怎么调度、切换上下文,这就省去了CPU的切换开销,所以协程在一定程度上又好于多线程。

how:

python里面怎么使用协程?答案是使用gevent,使用方法:看这里

使用协程,可以不受线程开销的限制,我尝试过一次把20W条url放在单进程的协程里执行,完全没问题。

所以最推荐的方法,是多进程+协程(可以看作是每个进程里都是单线程,而这个单线程是协程化的)

多进程+协程下,避开了CPU切换的开销,又能把多个CPU充分利用起来,这种方式对于数据量较大的爬虫还有文件读写之类的效率提升是巨大的。

小例子:

[python]view plain

Ⅱ Python多进程运行——Multiprocessing基础教程2

上篇文章简单介绍了multiprocessing模块,本文将要介绍进程之间的数据共享和信息传递的概念。

在多进程处理中,所有新创建的进程都会有这两个特点:独立运行,有自己的内存空间。

我们来举个例子展示一下:

这个程序的输出结果是:

在上面的程序中我们尝试在两个地方打印全局列表result的内容:

我们再用一张图来帮助理解记忆不同进程间的数据关系:

如果程序需要在不同的进程之间共享一些数据的话,该怎么做呢?不用担心,multiprocessing模块提供了Array对象和Value对象,用来在进程之间共享数据。

所谓Array对象和Value对象分别是指从共享内存中分配的ctypes数组和对象。我们直接来看一个例子,展示如何用Array对象和Value对象在进程之间共享数据:

程序输出的结果如下:

成功了!主程序和p1进程输出了同样的结果,说明程序中确实完成了不同进程间的数据共享。那么我们来详细看一下上面的程序做了什么:

在主程序中我们首先创建了一个Array对象:

向这个对象输入的第一个参数是数据类型:i表示整数,d代表浮点数。第二个参数是数组的大小,在这个例子中我们创建了包含4个元素的数组。

类似的,我们创建了一个Value对象:

我们只对Value对象输入了一个参数,那就是数据类型,与上述的方法一致。当然,我们还可以对其指定一个初始值(比如10),就像这样:

随后,我们在创建进程对象时,将刚创建好的两个对象:result和square_sum作为参数输入给进程:

在函数中result元素通过索引进行数组赋值,square_sum通过 value 属性进行赋值。

注意:为了完整打印result数组的结果,需要使用 result[:] 进行打印,而square_sum也需要使用 value 属性进行打印:

每当python程序启动时,同时也会启动一个服务器进程。随后,只要我们需要生成一个新进程,父进程就会连接到服务器并请求它派生一个新进程。这个服务器进程可以保存Python对象,并允许其他进程使用代理来操作它们。

multiprocessing模块提供了能够控制服务器进程的Manager类。所以,Manager类也提供了一种创建可以在不同流程之间共享的数据的方法。

服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型,如列表、字典、队列、值、数组等。此外,单个管理器可以由网络上不同计算机上的进程共享。

但是,服务器进程管理器的速度比使用共享内存要慢。

让我们来看一个例子:

这个程序的输出结果是:

我们来理解一下这个程序做了什么:首先我们创建了一个manager对象

在with语句下的所有行,都是在manager对象的范围内的。接下来我们使用这个manager对象创建了列表(类似的,我们还可以用 manager.dict() 创建字典)。

最后我们创建了进程p1(用于在records列表中插入一条新的record)和p2(将records打印出来),并将records作为参数进行传递。

服务器进程的概念再次用下图总结一下:

为了能使多个流程能够正常工作,常常需要在它们之间进行一些通信,以便能够划分工作并汇总最后的结果。multiprocessing模块支持进程之间的两种通信通道:Queue和Pipe。

使用队列来回处理多进程之间的通信是一种比较简单的方法。任何Python对象都可以使用队列进行传递。我们来看一个例子:

上面这个程序的输出结果是:

我们来看一下上面这个程序到底做了什么。首先我们创建了一个Queue对象:

然后,将这个空的Queue对象输入square_list函数。该函数会将列表中的数平方,再使用 put() 方法放入队列中:

随后使用 get() 方法,将q打印出来,直至q重新称为一个空的Queue对象:

我们还是用一张图来帮助理解记忆:

一个Pipe对象只能有两个端点。因此,当进程只需要双向通信时,它会比Queue对象更好用。

multiprocessing模块提供了 Pipe() 函数,该函数返回由管道连接的一对连接对象。 Pipe() 返回的两个连接对象分别表示管道的两端。每个连接对象都有 send() 和 recv() 方法。

我们来看一个例子:

上面这个程序的输出结果是:

我们还是来看一下这个程序到底做了什么。首先创建了一个Pipe对象:

与上文说的一样,该对象返回了一对管道两端的两个连接对象。然后使用 send() 方法和 recv() 方法进行信息的传递。就这么简单。在上面的程序中,我们从一端向另一端发送一串消息。在另一端,我们收到消息,并在收到END消息时退出。

要注意的是,如果两个进程(或线程)同时尝试从管道的同一端读取或写入管道中的数据,则管道中的数据可能会损坏。不过不同的进程同时使用管道的两端是没有问题的。还要注意,Queue对象在进程之间进行了适当的同步,但代价是增加了计算复杂度。因此,Queue对象对于线程和进程是相对安全的。

最后我们还是用一张图来示意:

Python的multiprocessing模块还剩最后一篇文章:多进程的同步与池化

敬请期待啦!

Ⅲ python 多进程读取同一个循环处理、可以用multiprocessing

可以每个在func中加上一个参数data,data是这个线程处理的数据;

多线程处理的时候,给每个线程分配相应的data就可以了。


给个示例:

#-*-coding:utf-8-*-
importthread,threading
importtime

defFuncTest(tdata):
printtdata

classmythread(threading.Thread):
def__init__(self,threadname):
threading.Thread.__init__(self)

defrun(self):
lock.acquire()
FuncTest(ft)
lock.release()

defMutiThread(num):
threads=[]
i=0
globalft
forxinxrange(num):
threads.append(mythread(num))
fortinthreads:
time.sleep(0.5)
lock.acquire()
ft=GetThreadParam(datafile,num,i)
#print'[%s]Thread:%s,Testdata:%s'%(time.ctime(),t,ft)
i=i+1
t.start()
lock.release()
fortinthreads:
t.join()

defGetThreadParam(datafile,num,curthread):
#线程数需要小于文件行数
f=open(datafile,'r')
lines=f.readlines()
divres=divmod(len(lines),num)
ifcurthread<(num-1):
res=lines[curthread*divres[0]:(curthread+1)*divres[0]]
elifcurthread==(num-1):
res=lines[curthread*divres[0]:((curthread+1)*divres[0]+divres[1])]
returnres
f.close()

if__name__=='__main__':

globalnum,lock
datafile='a.txt'

num=3#num并发数

lock=threading.Lock()
MutiThread(num)

a.txt文件内容如下

1

2

3

4

5

6

7

8

9

10


3个线程并发时,运行结果:

>>>

['1 ', '2 ', '3 ']

['4 ', '5 ', '6 ']

['7 ', '8 ', '9 ', '10']

Ⅳ python读取大文件处理时使用多线程

如果有个很大的文件,几十G?,需要每次读取一部分,处理后再读取剩余部分。
with open as f 已经从内部处理难点,使用 for line in f 以迭代器的形式每次读取一行,不会有内存问题。

下面程序的思路是用一个列表存放读取到的数据,达到长度后就开始处理,处理完就清空列表,继续执行

Ⅳ 关于windows下使用python 多进程拷贝文件时出现拷贝空文件夹的问题

代码有点多,没看完。但有两点疑问:

Ⅵ python 多进程

基于官方文档:
https://docs.python.org/zh-cn/3/library/multiprocessing.html
日乐购,刚才看到的一个博客,写的都不太对,还是基于官方的比较稳妥
我就是喜欢抄官方的,哈哈

通常我们使用Process实例化一个进程,并调用 他的 start() 方法启动它。
这种方法和 Thread 是一样的。

上图中,我写了 p.join() 所以主进程是 等待 子进程执行完后,才执行 print("运行结束")
否则就是反过来了(这个不一定,看你的语句了,顺序其实是随机的)例如:

主进加个 sleep

所以不加join() ,其实子进程和主进程是各干各的,谁也不等谁。都执行完后,文件运行就结束了

上面我们用了 os.getpid() 和 os.getppid() 获取 当前进程,和父进程的id
下面就讲一下,这两个函数的用法:
os.getpid()
返回当前进程的id
os.getppid()
返回父进程的id。 父进程退出后,unix 返回初始化进程(1)中的一个
windows返回相同的id (可能被其他进程使用了)
这也就解释了,为啥我上面 的程序运行多次, 第一次打印的parentid 都是 14212 了。
而子进程的父级 process id 是调用他的那个进程的 id : 1940

视频笔记:
多进程:使用大致方法:

参考: 进程通信(pipe和queue)

pool.map (函数可以有return 也可以共享内存或queue) 结果直接是个列表

poll.apply_async() (同map,只不过是一个进程,返回结果用 xx.get() 获得)

报错:

参考 : https://blog.csdn.net/xiemanR/article/details/71700531

把 pool = Pool() 放到 if name == " main ": 下面初始化搞定。
结果:

这个肯定有解释的

测试多进程计算效果:
进程池运行:

结果:

普通计算:

我们同样传入 1 2 10 三个参数测试:

其实对比下来开始快了一半的;
我们把循环里的数字去掉一个 0;
单进程:

多进程:

两次测试 单进程/进程池 分别为 0.669 和 0.772 几乎成正比的。
问题 二:
视图:
post 视图里面

Music 类:

直接报错:

写在 类里面也 在函数里用 self.pool 调用也不行,也是相同的错误。

最后 把 pool = Pool 直接写在 search 函数里面,奇迹出现了:

前台也能显示搜索的音乐结果了

总结一点,进程这个东西,最好 写在 直接运行的函数里面,而不是 一个函数跳来跳去。因为最后可能 是在子进程的子进程运行的,这是不许的,会报错。
还有一点,多进程运行的函数对象,不能是 lambda 函数。也许lambda 虚拟,在内存??

使用 pool.map 子进程 函数报错,导致整个 pool 挂了:
参考: https://blog.csdn.net/hedongho/article/details/79139606
主要你要,对函数内部捕获错误,而不能让异常抛出就可以了。

关于map 传多个函数参数
我一开始,就是正常思维,多个参数,搞个元祖,让参数一一对应不就行了:

报错:

参考:
https://blog.csdn.net/qq_15969343/article/details/84672527
普通的 process 当让可以穿多个参数,map 却不知道咋传的。
apply_async 和map 一样,不知道咋传的。

最简单的方法:
使用 starmap 而不是 map

结果:
子进程结束
1.8399453163146973
成功拿到结果了

关于map 和 starmap 不同的地方看源码

关于apply_async() ,我没找到多参数的方法,大不了用 一个迭代的 starmap 实现。哈哈

关于 上面源码里面有 itertools.starmap
itertools 用法参考:
https://docs.python.org/zh-cn/3/library/itertools.html#itertool-functions

有个问题,多进程最好不要使用全部的 cpu , 因为这样可能影响其他任务,所以 在进程池 添加 process 参数 指定,cpu 个数:

上面就是预留了 一个cpu 干其他事的

后面直接使用 Queue 遇到这个问题:

解决:
Manager().Queue() 代替 Queue()

因为 queue.get() 是堵塞型的,所以可以提前判断是不是 空的,以免堵塞进程。比如下面这样:
使用 queue.empty() 空为True

Ⅶ python中多进程+协程的使用以及为什么要用它

python是一款应用非常广泛的脚本程序语言,谷歌公司的网页就是用拆埋python编写。python在生物信息、统计、网页制作、计算等多个领域都体现出了强大的功能。python和其他脚本语言如java、R、Perl 一样,都可以直接在命令行里运行脚本程序。工具/原料
python;CMD命令行;windows操作系统
方法/步骤
1、首先下载安装python,建议安装2.7版本以上,3.0版本以下,由于3.0版本以上不向下兼容,体验较差。

2、打开文本编辑器,推荐editplus,notepad等,将文件保存成 .py格式,editplus和notepad支持识别python语法。
脚本第一行一定要写上 #!usr/bin/python
表示该脚本文件是可执行python脚本
如果python目录不在usr/bin目录下,则替换成当前python执行程序的目录。
3、编写完脚本之后注意调试、可以直接用editplus调试。调试方法可自行网络。脚本写完之后,打开CMD命令行,前提是python 已经被携扰加入到环境变量中,如果没有加入到环境变量,请百旅隐蚂度

4、在CMD命令行中,输入 “python” + “空格”,即 ”python “;将已经写好的脚本文件拖拽到当前光标位置,然后敲回车运行即可。

Ⅷ python可以多进程吗

想要充分利用多核CPU资源,Python中大部分情况下都需要使用多进程,Python中提供了multiprocessing这个包实现多进程。multiprocessing支持子进程、进程间的同步与通信,提供了Process、Queue、Pipe、Lock等组件。

开辟子进程
multiprocessing中提供了Process类来生成进程实例

Process([group [, target [, name [, args [, kwargs]]]]])
group分组,实际上不使用
target表示调用对象,你可以传入方法的名字
args表示给调用对象以元组的形式提供参数,比如target是函数a,他有两个参数m,n,那么该参数为args=(m, n)即可
kwargs表示调用对象的字典
name是别名,相当于给这个进程取一个名字
先来个小例子:

# -*- coding:utf-8 -*-
from multiprocessing import Process, Pool
import os
import time

def run_proc(wTime):
n = 0
while n < 3:
print "subProcess %s run," % os.getpid(), "{0}".format(time.ctime()) #获取当前进程号和正在运行是的时间
time.sleep(wTime) #等待(休眠)
n += 1

if __name__ == "__main__":
p = Process(target=run_proc, args=(2,)) #申请子进程
p.start() #运行进程
print "Parent process run. subProcess is ", p.pid
print "Parent process end,{0}".format(time.ctime())
运行结果:

Parent process run. subProcess is 30196
Parent process end,Mon Mar 27 11:20:21 2017
subProcess 30196 run, Mon Mar 27 11:20:21 2017
subProcess 30196 run, Mon Mar 27 11:20:23 2017
subProcess 30196 run, Mon Mar 27 11:20:25 2017

根据运行结果可知,父进程运行结束后子进程仍然还在运行,这可能造成僵尸( zombie)进程。

通常情况下,当子进程终结时,它会通知父进程,清空自己所占据的内存,并在内核里留下自己的退出信息。父进程在得知子进程终结时,会从内核中取出子进程的退出信息。但是,如果父进程早于子进程终结,这可能造成子进程的退出信息滞留在内核中,子进程成为僵尸(zombie)进程。当大量僵尸进程积累时,内存空间会被挤占。

有什么办法可以避免僵尸进程呢?
这里介绍进程的一个属性 deamon,当其值为TRUE时,其父进程结束,该进程也直接终止运行(即使还没运行完)。
所以给上面的程序加上p.deamon = true,看看效果。

# -*- coding:utf-8 -*-
from multiprocessing import Process, Pool
import os
import time

def run_proc(wTime):
n = 0
while n < 3:
print "subProcess %s run," % os.getpid(), "{0}".format(time.ctime())
time.sleep(wTime)
n += 1

if __name__ == "__main__":
p = Process(target=run_proc, args=(2,))
p.daemon = True #加入daemon
p.start()
print "Parent process run. subProcess is ", p.pid
print "Parent process end,{0}".format(time.ctime())
执行结果:

Parent process run. subProcess is 31856
Parent process end,Mon Mar 27 11:40:10 2017

这是问题又来了,子进程并没有执行完,这不是所期望的结果。有没办法将子进程执行完后才让父进程结束呢?
这里引入p.join()方法,它使子进程执行结束后,父进程才执行之后的代码

# -*- coding:utf-8 -*-
from multiprocessing import Process, Pool
import os
import time

def run_proc(wTime):
n = 0
while n < 3:
print "subProcess %s run," % os.getpid(), "{0}".format(time.ctime())
time.sleep(wTime)
n += 1

if __name__ == "__main__":
p = Process(target=run_proc, args=(2,))
p.daemon = True
p.start()
p.join() #加入join方法
print "Parent process run. subProcess is ", p.pid
print "Parent process end,{0}".format(time.ctime())
执行结果:

subProcess 32076 run, Mon Mar 27 11:46:07 2017
subProcess 32076 run, Mon Mar 27 11:46:09 2017
subProcess 32076 run, Mon Mar 27 11:46:11 2017
Parent process run. subProcess is 32076
Parent process end,Mon Mar 27 11:46:13 2017

这样所有的进程就能顺利的执行了。

Ⅸ 如何使用Python实现多进程编程

1.Process
创建进程的类:Process([group[,target[,name[,args[,kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group实质上不使用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。
属性:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。

例1.1:创建函数并将其作为单个进程
importmultiprocessing
importtime

defworker(interval):
n=5
whilen>0:
print("Thetimeis{0}".format(time.ctime()))
time.sleep(interval)
n-=1

if__name__=="__main__":
p=multiprocessing.Process(target=worker,args=(3,))
p.start()
print"p.pid:",p.pid
print"p.name:",p.name
print"p.is_alive:",p.is_alive()
结果
12345678p.pid:8736p.name:Process-1p.is_alive:TrueThetimeisTueApr2120:55:122015ThetimeisTueApr2120:55:152015ThetimeisTueApr2120:55:182015ThetimeisTueApr2120:55:212015ThetimeisTueApr2120:55:242015

例1.2:创建函数并将其作为多个进程
importmultiprocessing
importtime

defworker_1(interval):
print"worker_1"
time.sleep(interval)
print"endworker_1"

defworker_2(interval):
print"worker_2"
time.sleep(interval)
print"endworker_2"

defworker_3(interval):
print"worker_3"
time.sleep(interval)
print"endworker_3"

if__name__=="__main__":
p1=multiprocessing.Process(target=worker_1,args=(2,))
p2=multiprocessing.Process(target=worker_2,args=(3,))
p3=multiprocessing.Process(target=worker_3,args=(4,))

p1.start()
p2.start()
p3.start()

print("ThenumberofCPUis:"+str(multiprocessing.cpu_count()))
forpinmultiprocessing.active_children():
print("childp.name:"+p.name+" p.id"+str(p.pid))
print"END!!!!!!!!!!!!!!!!!"
结果
1234567891011ThenumberofCPUis:4childp.name:Process-3p.id7992childp.name:Process-2p.id4204childp.name:Process-1p.id6380END!!!!!!!!!!!!!!!!!worker_1worker_3worker_2endworker_1endworker_2endworker_3

例1.3:将进程定义为类
importmultiprocessing
importtime

classClockProcess(multiprocessing.Process):
def__init__(self,interval):
multiprocessing.Process.__init__(self)
self.interval=interval

defrun(self):
n=5
whilen>0:
print("thetimeis{0}".format(time.ctime()))
time.sleep(self.interval)
n-=1

if__name__=='__main__':
p=ClockProcess(3)
p.start()
注:进程p调用start()时,自动调用run()
结果
12345thetimeisTueApr2120:31:302015thetimeisTueApr2120:31:332015thetimeisTueApr2120:31:362015thetimeisTueApr2120:31:392015thetimeisTueApr2120:31:422015

Ⅹ python 多进程写入同一个文件,经常报错:找不到文件,该怎么处理呢有没有大神能贴个例子之类的参考下

importthreading,time

defwrite(file,lock):
lock.acquire()#锁住
print("开始写出")
file.write("写出")
print("写出完毕")
lock.release()#解锁
returnTrue

lock=threading.Lock()#获取一个锁
file=open("1.txt")
foriinrange(100):
threading.Thread(target=write,args=(file,lock)).run()

阅读全文

与python多进程读取文件相关的资料

热点内容
什么样的源码好看 浏览:154
手机主服务器有什么用 浏览:610
程序编写命令 浏览:597
android发送心跳包 浏览:385
指标源码和原理 浏览:700
汽车空调压缩吸盘 浏览:208
崽崽因app版本不同不能邀请怎么办 浏览:686
poa算法得到的解为全局最优解 浏览:926
python符号表达式 浏览:34
威驰压缩机继电器 浏览:871
华为手机怎么设置移动数据app 浏览:959
空调压缩机哪的厂家多 浏览:390
手指速算法24加7怎么算 浏览:139
如何用python写vlookup函数 浏览:798
社保加密狗厂商 浏览:216
php编译运行说法 浏览:957
程序员说喂 浏览:258
抖音直播云服务器 浏览:629
一加7pro文件夹data 浏览:426
程序员淋雨 浏览:967