1. python多线程几种方法实现
Python进阶(二十六)-多线程实现同步的四种方式
临界资源即那些一次只能被一个线程访问的资源,典型例子就是打印机,它一次只能被一个程序用来执行打印功能,因为不能多个线程同时操作,而访问这部分资源的代码通常称之为临界区。
锁机制
threading的Lock类,用该类的acquire函数进行加锁,用realease函数进行解锁
import threadingimport timeclass Num:
def __init__(self):
self.num = 0
self.lock = threading.Lock() def add(self):
self.lock.acquire()#加锁,锁住相应的资源
self.num += 1
num = self.num
self.lock.release()#解锁,离开该资源
return num
n = Num()class jdThread(threading.Thread):
def __init__(self,item):
threading.Thread.__init__(self)
self.item = item def run(self):
time.sleep(2)
value = n.add()#将num加1,并输出原来的数据和+1之后的数据
print(self.item,value)for item in range(5):
t = jdThread(item)
t.start()
t.join()#使线程一个一个执行
当一个线程调用锁的acquire()方法获得锁时,锁就进入“locked”状态。每次只有一个线程可以获得锁。如果此时另一个线程试图获得这个锁,该线程就会变为“blocked”状态,称为“同步阻塞”(参见多线程的基本概念)。
直到拥有锁的线程调用锁的release()方法释放锁之后,锁进入“unlocked”状态。线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态。
信号量
信号量也提供acquire方法和release方法,每当调用acquire方法的时候,如果内部计数器大于0,则将其减1,如果内部计数器等于0,则会阻塞该线程,知道有线程调用了release方法将内部计数器更新到大于1位置。
import threadingimport timeclass Num:
def __init__(self):
self.num = 0
self.sem = threading.Semaphore(value = 3) #允许最多三个线程同时访问资源
def add(self):
self.sem.acquire()#内部计数器减1
self.num += 1
num = self.num
self.sem.release()#内部计数器加1
return num
n = Num()class jdThread(threading.Thread):
def __init__(self,item):
threading.Thread.__init__(self)
self.item = item def run(self):
time.sleep(2)
value = n.add()
print(self.item,value)for item in range(100):
2. python实现多线程并发执行
由于停服维护的需求(服务越来越多的原因),此前编写的shell脚本执行速度缓慢(for循环,这就会很慢),为提高执行速度,参考很多资料,完成此脚本,实现并发执行机制.(当然这是测试脚本,有需要的同学,拿去改ba改ba,应该就可以用了)
此处脚本参考了 https://www.jb51.net/article/86053.htm
3. python多线程并行计算通过向线程池ThreadPoolExecutor提交任务的实现方法
Python的线程池可以有效地控制系统中并发线程的数量。
当程序中需要创建许多生存期较短的线程执行运算任务时,首先考虑使用线程池。线程池任务启动时会创建出最大线程数参数 max_workers 指定数量的空闲线程,程序只要将执行函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。配合使用 with 关键字实现任务队列完成后自动关闭线程池释放资源。
4. python多线程并发数量控制
python多线程如果不进行并发数量控制,在启动线程数量多到一定程度后,会造成线程无法启动的错误。
控制多线程并发数量的方法有好几钟,下面介绍用queue控制多线程并发数量的方法。python3
5. Python的多进程模块multiprocessing
众所周知,Python中不存在真正的多线程,Python中的多线程是一个并发过程。如果想要并行的执行程序,充分的利用cpu资源(cpu核心),还是需要使用多进程解决的。其中multiprocessing模块应该是Python中最常用的多进程模块了。
基本上multiprocessing这个模块和threading这个模块用法是相同的,也是可以通过函数和类创建进程。
上述案例基本上就是笔者搬用了上篇文章多线程的案例,可见其使用的相似之处。导入multiprocessing后实例化Process就可以创建一个进程,参数的话也是和多线程一样,target放置进程执行函数,args存放该函数的参数。
使用类来创建进程也是需要先继承multiprocessing.Process并且实现其init方法。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求。
但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程。
需要注意的是,在调用join方法阻塞进程前,需要先调用close方法,,否则程序会出错。
在上述案例中,提到了非阻塞,当把创建进程的方法换为pool.apply(func, (msg,))时,就会阻塞进程,出现下面的状况。
在multiprocessing模块中还存在Queue对象,这是一个进程的安全队列,近似queue.Queue。队列一般也是需要配合多线程或者多进程使用。
下列案例是一个使用进程队列实现的生产者消费者模式。
multiprocessing支持两种进程间的通信,其中一种便是上述案例的队列,另一种则称作管道。在官方文档的描述中,multiprocessing中的队列是基于管道实现的,并且拥有更高的读写效率。
管道可以理解为进程间的通道,使用Pipe([plex])创建,并返回一个元组(conn1,conn2)。如果plex被置为True(默认值),那么该管道是双向的,如果plex被置为False,那么该管道是单向的,即conn1只能用于接收消息,而conn2仅能用于发送消息。
其中conn1、conn2表示管道两端的连接对象,每个连接对象都有send()和recv()方法。send和recv方法分别是发送和接受消息的方法。例如,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。
关于multiprocessing模块其实还有很多实用的类和方法,由于篇幅有限(懒),笔者就先写到这里。该模块其实用起来很像threading模块,像锁对象和守护线程(进程)等multiprocessing模块也是有的,使用方法也近乎相同。
如果想要更加详细的了解multiprocessing模块,请参考官方文档。
6. Python中的多进程与多线程/分布式该如何使用
Python提供了非常好用的多进程包multiprocessing,你只需要定义一个函数,Python会替你完成其他所有事情。
借助这个包,可以轻松完成从单进程到并发执行的转换。
1、新建单一进程
如果我们新建少量进程,可以如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
p = multiprocessing.Process(target=func, args=("hello", ))
p.start()
p.join()
print "Sub-process done."12345678910111213
2、使用进程池
是的,你没有看错,不是线程池。它可以让你跑满多核CPU,而且使用方法非常简单。
注意要用apply_async,如果落下async,就变成阻塞版本了。
processes=4是最多并发进程数量。
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
for i in xrange(10):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, ))
pool.close()
pool.join()
print "Sub-process(es) done."12345678910111213141516
3、使用Pool,并需要关注结果
更多的时候,我们不仅需要多进程执行,还需要关注每个进程的执行结果,如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
return "done " + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = []
for i in xrange(10):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg, )))
pool.close()
pool.join()
for res in result:
print res.get()
print "Sub-process(es) done."
2014.12.25更新
根据网友评论中的反馈,在Windows下运行有可能崩溃(开启了一大堆新窗口、进程),可以通过如下调用来解决:
multiprocessing.freeze_support()1
附录(自己的脚本):
#!/usr/bin/python
import threading
import subprocess
import datetime
import multiprocessing
def dd_test(round, th):
test_file_arg = 'of=/zbkc/test_mds_crash/1m_%s_%s_{}' %(round, th)
command = "seq 100 | xargs -i dd if=/dev/zero %s bs=1M count=1" %test_file_arg
print command
subprocess.call(command,shell=True,stdout=open('/dev/null','w'),stderr=subprocess.STDOUT)
def mds_stat(round):
p = subprocess.Popen("zbkc mds stat", shell = True, stdout = subprocess.PIPE)
out = p.stdout.readlines()
if out[0].find('active') != -1:
command = "echo '0205pm %s round mds status OK, %s' >> /round_record" %(round, datetime.datetime.now())
command_2 = "time (ls /zbkc/test_mds_crash/) 2>>/round_record"
command_3 = "ls /zbkc/test_mds_crash | wc -l >> /round_record"
subprocess.call(command,shell=True)
subprocess.call(command_2,shell=True)
subprocess.call(command_3,shell=True)
return 1
else:
command = "echo '0205 %s round mds status abnormal, %s, %s' >> /round_record" %(round, out[0], datetime.datetime.now())
subprocess.call(command,shell=True)
return 0
#threads = []
for round in range(1, 1600):
pool = multiprocessing.Pool(processes = 10) #使用进程池
for th in range(10):
# th_name = "thread-" + str(th)
# threads.append(th_name) #添加线程到线程列表
# threading.Thread(target = dd_test, args = (round, th), name = th_name).start() #创建多线程任务
pool.apply_async(dd_test, (round, th))
pool.close()
pool.join()
#等待线程完成
# for t in threads:
# t.join()
if mds_stat(round) == 0:
subprocess.call("zbkc -s",shell=True)
break
7. Python中级精华-并发之启动和停止线程
为了让代码能够并发执行,向创建线程并在核实的时候销毁它。
由于目的比较单纯,只是讲解基础的线程创建方法,所以可以直接使用threading库中的Thread类来实例化一个线程对象。
例子,用户输入两个数字,并且求其两个数字的四则运算的结果:
除了以上的一些功能以外,在python线程
中没有其他的诸如给线程发信号、设置线程调度属性、执行任何其他高级操作的功能了,如果需要这些功能,就需要手工编写了。
另外,需要注意的是,由于GIL(全局解释器锁)的存在,限制了在python解释器当中只允许运行一个线程。基于这个原因,不停该使用python线程来处理计算密集型的任务,因为在这种任务重我们希望在多个CPU核心上实现并行处理。Python线程更适合于IO处理以及设计阻塞操作的并发执行任务(即等待IO响应或等待数据库取出结果等)。
如何判断线程是否已经启动?
目的:我们加载了一个线程,但是想要知道这个线程什么时候才会开始运行?
方法:
线程的核心特征我认为就是不确定性,因为其什么时候开始运行,什么时候被打断,什么时候恢复执行,这不是程序员能够控制的,而是有系统调度
来完成的。如果遇到像某个线程的运行依托于其他某个线程运行到某个状态时该线程才能开始运行,那么这就是线程同步
问题,同样这个问题非常棘手。要解决这类问题我们要借助threading中的Event对象。
Event其实和条件标记类似,匀速线程
等待某个时间发生。初始状态时事件被设置成0。如果事件没有被设置而线程正在等待该事件,那么线程就会被阻塞,直到事件被设置位置,当有线程设置了这个事件之后,那么就会唤醒正在等待事件的线程,如果线程等待的事件已经设置了,那么线程会继续执行。
一个例子:
如上能够确定的是,主线程会在线程t运行结束时再运行。
8. Python如何实现并行的多线程
Python中使用线程有两种方式:函数或者用类来包装线程对象。函数式:调用thread模块中的start_new_thread()函数来产生新线程。线程模块:Python通过两个标准库thread和threading提供对线程的支持。
9. python stackless 怎么多线程并发
1 介绍
1.1 为什么要使用Stackless
摘自stackless网站。
Note
Stackless Python 是Python编程语言的一个增强版本,它使程序员从基于线程的编程方式中获得好处,并避免传统线程所带来的性能与复杂度问题。Stackless为 Python带来的微线程扩展,是一种低开销、轻量级的便利工具,如果使用得当,可以获益如下:
改进程序结构
增进代码可读性
提高编程人员生产力
以上是Stackless Python很简明的释义,但其对我们意义何在?——就在于Stackless提供的并发建模工具,比目前其它大多数传统编程语言所提供的,都更加易用: 不仅是Python自身,也包括Java、C++,以及其它。尽管还有其他一些语言提供并发特性,可它们要么是主要用于学术研究的(如 Mozart/Oz),要么是罕为使用、或用于特殊目的的专业语言(如Erlang)。而使用stackless,你将会在Python本身的所有优势之 上,在一个(但愿)你已经很熟悉的环境中,再获得并发的特性。
这自然引出了个问题:为什么要并发?
1.1.1 现实世界就是并发的
现实世界就是“并发”的,它是由一群事物(或“演员”)所组成,而这些事物以一种对彼此所知有限的、松散耦合的方式相互作用。传说中面向对象编程有 一个好处,就是对象能够对现实的世界进行模拟。这在一定程度上是正确的,面向对象编程很好地模拟了对象个体,但对于这些对象个体之间的交互,却无法以一种 理想的方式来表现。例如,如下代码实例,有什么问题?
第一印象,没问题。但是,上例中存在一个微妙的安排:所有事件是次序发生的,即:直到丈夫吃完饭,妻子才开始吃;儿子则一直等到母亲吃完才吃;而女 儿则是最后一个。在现实世界中,哪怕是丈夫还堵车在路上,妻子、儿子和女儿仍然可以该吃就吃,而要在上例中的话,他们只能饿死了——甚至更糟:永远没有人 会知道这件事,因为他们永远不会有机会抛出一个异常来通知这个世界!
1.1.2 并发可能是(仅仅可能是)下一个重要的编程范式
我个人相信,并发将是软件世界里的下一个重要范式。随着程序变得更加复杂和耗费资源,我们已经不能指望摩尔定律来每年给我们提供更快的CPU了,当 前,日常使用的个人计算机的性能提升来自于多核与多CPU机。一旦单个CPU的性能达到极限,软件开发者们将不得不转向分布式模型,靠多台计算机的互相协 作来建立强大的应用(想想GooglePlex)。为了取得多核机和分布式编程的优势,并发将很快成为做事情的方式的事实标准。
1.2 安装stackless
安装Stackless的细节可以在其网站上找到。现在Linux用户可以通过SubVersion取得源代码并编译;而对于Windows用户, 则有一个.zip文件供使用,需要将其解压到现有的Python安装目录中。接下来,本教程假设Stackless Python已经安装好了,可以工作,并且假设你对Python语言本身有基本的了解。
2 stackless起步
本章简要介绍了stackless的基本概念,后面章节将基于这些基础,来展示更加实用的功能。
2.1 微进程(tasklet)
微进程是stackless的基本构成单元,你可以通过提供任一个Python可调用对象(通常为函数或类的方法)来建立它,这将建立一个微进程并将其添加到调度器。这是一个快速演示:
注意,微进程将排起队来,并不运行,直到调用stackless.run()。
2.2 调度器(scheler)
调度器控制各个微进程运行的顺序。如果刚刚建立了一组微进程,它们将按照建立的顺序来执行。在现实中,一般会建立一组可以再次被调度的微进程,好让每个都有轮次机会。一个快速演示:
注意:当调用stackless.schele()的时候,当前活动微进程将暂停执行,并将自身重新插入到调度器队列的末尾,好让下一个微进程被执行。一旦在它前面的所有其他微进程都运行过了,它将从上次 停止的地方继续开始运行。这个过程会持续,直到所有的活动微进程都完成了运行过程。这就是使用stackless达到合作式多任务的方式。
2.3 通道(channel)
通道使得微进程之间的信息传递成为可能。它做到了两件事:
能够在微进程之间交换信息。
能够控制运行的流程。
又一个快速演示:
接收的微进程调用channel.receive()的时候,便阻塞住,这意味着该微进程暂停执行,直到有信息从这个通道送过来。除了往这个通道发送信息以外,没有其他任何方式可以让这个微进程恢复运行。
若有其他微进程向这个通道发送了信息,则不管当前的调度到了哪里,这个接收的微进程都立即恢复执行;而发送信息的微进程则被转移到调度列表的末尾,就像调用了stackless.schele()一样。
同样注意,发送信息的时候,若当时没有微进程正在这个通道上接收,也会使当前微进程阻塞:
发送信息的微进程,只有在成功地将数据发送到了另一个微进程之后,才会重新被插入到调度器中。
2.4 总结
以上涵盖了stackless的大部分功能。似乎不多是吧?——我们只使用了少许对象,和大约四五个函数调用,来进行操作。但是,使用这种简单的API作为基本建造单元,我们可以开始做一些真正有趣的事情。
3 协程(coroutine)
3.1 子例程的问题
大多数传统编程语言具有子例程的概念。一个子例程被另一个例程(可能还是其它某个例程的子例程)所调用,或返回一个结果,或不返回结果。从定义上说,一个子例程是从属于其调用者的。
见下例:
有经验的编程者会看到这个程序的问题所在:它导致了堆栈溢出。如果运行这个程序,它将显示一大堆讨厌的跟踪信息,来指出堆栈空间已经耗尽。
3.1.1 堆栈
我仔细考虑了,自己对C语言堆栈的细节究竟了解多少,最终还是决定完全不去讲它。似乎,其他人对其所尝试的描述,以及图表,只有本身已经理解了的人才能看得懂。我将试着给出一个最简单的说明,而对其有更多兴趣的读者可以从网上查找更多信息。
每当一个子例程被调用,都有一个“栈帧”被建立,这是用来保存变量,以及其他子例程局部信息的区域。于是,当你调用 ping() ,则有一个栈帧被建立,来保存这次调用相关的信息。简言之,这个帧记载着 ping 被调用了。当再调用 pong() ,则又建立了一个栈帧,记载着 pong 也被调用了。这些栈帧是串联在一起的,每个子例程调用都是其中的一环。就这样,堆栈中显示: ping 被调用所以 pong 接下来被调用。显然,当 pong() 再调用 ping() ,则使堆栈再扩展。下面是个直观的表示:
帧 堆栈
1 ping 被调用
2 ping 被调用,所以 pong 被调用
3 ping 被调用,所以 pong 被调用,所以 ping 被调用
4 ping 被调用,所以 pong 被调用,所以 ping 被调用,所以 pong 被调用
5 ping 被调用,所以 pong 被调用,所以 ping 被调用,所以 pong 被调用,所以 ping 被调用
6 ping 被调用,所以 pong 被调用,所以 ping 被调用,所以 pong 被调用,所以 ping 被调用……
现在假设,这个页面的宽度就表示系统为堆栈所分配的全部内存空间,当其顶到页面的边缘的时候,将会发生溢出,系统内存耗尽,即术语“堆栈溢出”。
3.1.2 那么,为什么要使用堆栈?
上例是有意设计的,用来体现堆栈的问题所在。在大多数情况下,当每个子例程返回的时候,其栈帧将被清除掉,就是说堆栈将会自行实现清理过程。这一般 来说是件好事,在C语言中,堆栈就是一个不需要编程者来手动进行内存管理的区域。很幸运,Python程序员也不需要直接来担心内存管理与堆栈。但是由于 Python解释器本身也是用C实现的,那些实现者们可是需要担心这个的。使用堆栈是会使事情方便,除非我们开始调用那种从不返回的函数,如上例中的,那 时候,堆栈的表现就开始和程序员别扭起来,并耗尽可用的内存。
3.2 走进协程
此时,将堆栈弄溢出是有点愚蠢的。 ping() 和 pong() 本不是真正意义的子例程,因为其中哪个也不从属于另一个,它们是“协程”,处于同等的地位,并可以彼此间进行无缝通信。
帧 堆栈
1 ping 被调用
2 pong 被调用
3 ping 被调用
4 pong 被调用
5 ping 被调用
6 pong 被调用
在stackless中,我们使用通道来建立协程。还记得吗,通道所带来的两个好处中的一个,就是能够控制微进程之间运行的流程。使用通道,我们可以在 ping 和 pong 这两个协程之间自由来回,要多少次就多少次,都不会堆栈溢出:
你可以运行这个程序要多久有多久,它都不会崩溃,且如果你检查其内存使用量(使用Windows的任务管理器或Linux的top命令),将会发现 使用量是恒定的。这个程序的协程版本,不管运行一分钟还是一天,使用的内存都是一样的。而如果你检查原先那个递归版本的内存用量,则会发现其迅速增长,直 到崩溃。
3.3 总结
是否还记得,先前我提到过,那个代码的递归版本,有经验的程序员会一眼看出毛病。但老实说,这里面并没有什么“计算机科学”方面的原因在阻碍它的正 常工作,有些让人坚信的东西,其实只是个与实现细节有关的小问题——只因为大多数传统编程语言都使用堆栈。某种意义上说,有经验的程序员都是被洗了脑,从 而相信这是个可以接受的问题。而stackless,则真正察觉了这个问题,并除掉了它。
4 轻量级线程
与当今的操作系统中内建的、和标准Python代码中所支持的普通线程相比,“微线程”要更为轻量级,正如其名称所暗示。它比传统线程占用更少的内存,并且微线程之间的切换,要比传统线程之间的切换更加节省资源。
为了准确说明微线程的效率究竟比传统线程高多少,我们用两者来写同一个程序。
4.1 hackysack模拟
Hackysack是一种游戏,就是一伙脏乎乎的小子围成一个圈,来回踢一个装满了豆粒的沙包,目标是不让这个沙包落地,当传球给别人的时候,可以耍各种把戏。踢沙包只可以用脚。
在我们的简易模拟中,我们假设一旦游戏开始,圈里人数就是恒定的,并且每个人都是如此厉害,以至于如果允许的话,这个游戏可以永远停不下来。
4.2 游戏的传统线程版本