1. python之多线程原理
并发:逻辑上具备同时处理多个任务的能力。
并行:物理上在同一时刻执行多个并发任务。
举例:开个QQ,开了一个进程,开了微信,开了一个进程。在QQ这个进程里面,传输文字开一个线程、传输语音开了一个线程、弹出对话框又开了一个线程。
总结:开一个软件,相当于开了一个进程。在这个软件运行的过程里,多个工作同时运转,完成了QQ的运行,那么这个多个工作分别有多个线程。
线程和进程之间的区别:
进程在python中的使用,对模块threading进行操作,调用的这个三方库。可以通过 help(threading) 了解其中的方法、变量使用情况。也可以使用 dir(threading) 查看目录结构。
current_thread_num = threading.active_count() # 返回正在运行的线程数量
run_thread_len = len(threading.enumerate()) # 返回正在运行的线程数量
run_thread_list = threading.enumerate() # 返回当前运行线程的列表
t1=threading.Thread(target=dance) #创建两个子线程,参数传递为函数名
t1.setDaemon(True) # 设置守护进程,守护进程:主线程结束时自动退出子线程。
t1.start() # 启动子线程
t1.join() # 等待进程结束 exit()`# 主线程退出,t1子线程设置了守护进程,会自动退出。其他子线程会继续执行。
2. python多线程的几种方法
Python进阶(二十六)-多线程实现同步的四种方式
临界资源即那些一次只能被一个线程访问的资源,典型例子就是打印机,它一次只能被一个程序用来执行打印功能,因为不能多个线程同时操作,而访问这部分资源的代码通常称之为临界区。
锁机制
threading的Lock类,用该类的acquire函数进行加锁,用realease函数进行解锁
import threadingimport timeclass Num:
def __init__(self):
self.num = 0
self.lock = threading.Lock() def add(self):
self.lock.acquire()#加锁,锁住相应的资源
self.num += 1
num = self.num
self.lock.release()#解锁,离开该资源
return num
n = Num()class jdThread(threading.Thread):
def __init__(self,item):
threading.Thread.__init__(self)
self.item = item def run(self):
time.sleep(2)
value = n.add()#将num加1,并输出原来的数据和+1之后的数据
print(self.item,value)for item in range(5):
t = jdThread(item)
t.start()
t.join()#使线程一个一个执行
当一个线程调用锁的acquire()方法获得锁时,锁就进入“locked”状态。每次只有一个线程可以获得锁。如果此时另一个线程试图获得这个锁,该线程就会变为“blocked”状态,称为“同步阻塞”(参见多线程的基本概念)。
直到拥有锁的线程调用锁的release()方法释放锁之后,锁进入“unlocked”状态。线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态。
信号量
信号量也提供acquire方法和release方法,每当调用acquire方法的时候,如果内部计数器大于0,则将其减1,如果内部计数器等于0,则会阻塞该线程,知道有线程调用了release方法将内部计数器更新到大于1位置。
import threadingimport timeclass Num:
def __init__(self):
self.num = 0
self.sem = threading.Semaphore(value = 3) #允许最多三个线程同时访问资源
def add(self):
self.sem.acquire()#内部计数器减1
self.num += 1
num = self.num
self.sem.release()#内部计数器加1
return num
n = Num()class jdThread(threading.Thread):
def __init__(self,item):
threading.Thread.__init__(self)
self.item = item def run(self):
time.sleep(2)
value = n.add()
print(self.item,value)for item in range(100):
3. python基础(21)-线程通信
到这里,我们要聊一下线程通信的内容;
首先,我们抛开语言不谈,先看看比较基础的东西,线程间通信的方式;其实也就是哪几种(我这里说的,是我的所谓的知道的。。。)事件,消息队列,信号量,条件变量(锁算不算?我只是认为是同步的一种);所以我们也就是要把这些掌握了,因为各有各的好处嘛;
条件变量我放到了上面的线程同步里面讲了,我总感觉这算是同步的一种,没有很多具体信息的沟通;同时吧,我认为条件变量比较重要,因为这种可以应用于线程池的操作上;所以比较重要;这里,抛开条件变量不谈,我们看看其他的东西;
1、消息队列:
queue 模块下提供了几个阻塞队列,这些队列主要用于实现线程通信。在 queue 模块下主要提供了三个类,分别代表三种队列,它们的主要区别就在于进队列、出队列的不同。
关于这三个队列类的简单介绍如下:
queue.Queue(maxsize=0):代表 FIFO(先进先出)的常规队列,maxsize 可以限制队列的大小。如果队列的大小达到队列的上限,就会加锁,再次加入元素时就会被阻塞,直到队列中的元素被消费。如果将 maxsize 设置为 0 或负数,则该队列的大小就是无限制的。
queue.LifoQueue(maxsize=0):代表 LIFO(后进先出)的队列,与 Queue 的区别就是出队列的顺序不同。
PriorityQueue(maxsize=0):代表优先级队列,优先级最小的元素先出队列。
这三个队列类的属性和方法基本相同, 它们都提供了如下属性和方法:
Queue.qsize():返回队列的实际大小,也就是该队列中包含几个元素。
Queue.empty():判断队列是否为空。
Queue.full():判断队列是否已满。
Queue.put(item, block=True, timeout=None):向队列中放入元素。如果队列己满,且 block 参数为 True(阻塞),当前线程被阻塞,timeout 指定阻塞时间,如果将 timeout 设置为 None,则代表一直阻塞,直到该队列的元素被消费;如果队列己满,且 block 参数为 False(不阻塞),则直接引发 queue.FULL 异常。
Queue.put_nowait(item):向队列中放入元素,不阻塞。相当于在上一个方法中将 block 参数设置为 False。
Queue.get(item, block=True, timeout=None):从队列中取出元素(消费元素)。如果队列已满,且 block 参数为 True(阻塞),当前线程被阻塞,timeout 指定阻塞时间,如果将 timeout 设置为 None,则代表一直阻塞,直到有元素被放入队列中; 如果队列己空,且 block 参数为 False(不阻塞),则直接引发 queue.EMPTY 异常。
Queue.get_nowait(item):从队列中取出元素,不阻塞。相当于在上一个方法中将 block 参数设置为 False。
其实我们想想,这个队列,是python进行封装的,那么我们可以用在线程间的通信;同时也是可以用做一个数据结构;先进先出就是队列,后进先出就是栈;我们用这个栈写个十进制转二进制的例子:
没毛病,可以正常的打印;其中需要注意的就是,maxsize在初始化的时候如果是0或者是个负数的话,那么就会是不限制大小;
那么其实我们想想,我们如果用做线程通信的话,我们两个线程,可以把队列设置为1的大小,如果是1对多,比如是创建者和消费者的关系,我们完全可以作为消息队列,比如说创建者一直在创建一些东西,然后放入到消息队列里面,然后供消费着使用;就是一个很好的例子;所以,其实说是消息队列,也就是队列,没差;
=====================================================================
下面来看一下事件
Event 是一种非常简单的线程通信机制,一个线程发出一个 Event,另一个线程可通过该 Event 被触发。
Event 本身管理一个内部旗标,程序可以通过 Event 的 set() 方法将该旗标设置为 True,也可以调用 clear() 方法将该旗标设置为 False。程序可以调用 wait() 方法来阻塞当前线程,直到 Event 的内部旗标被设置为 True。
Event 提供了如下方法:
is_set():该方法返回 Event 的内部旗标是否为True。
set():该方法将会把 Event 的内部旗标设置为 True,并唤醒所有处于等待状态的线程。
clear():该方法将 Event 的内部旗标设置为 False,通常接下来会调用 wait() 方法来阻塞当前线程。
wait(timeout=None):该方法会阻塞当前线程。
这里我想解释一下;其实对于事件来说,事件可以看成和条件变量是一样的,只是我们说说不一样的地方;
1、对于事件来说,一旦触发了事件,也就是说,一旦set为true了,那么就会一直为true,需要clear调内部的标志,才能继续wait;但是conditon不是,他是一次性的唤醒其他线程;
2、conditon自己带锁;事件呢?不是的;没有自己的锁;比如说有一个存钱的线程,有一个是取钱的线程;那么存钱的线程要存钱;需要怎么办呢?1、发现银行没有钱了(is_set判断);2、锁住银行;3、存钱;4、释放银行;5、唤醒事件;对于取钱的人;1、判断是否有钱;2、被唤醒了,然后锁住银行;3、开始取钱;4、清理告诉存钱的人,我没钱了(clear);5、释放锁;6、等着钱存进去;
其实说白了,就是记住一点;这个旗标需要自己clear就对了
写个例子,怕以后忘了怎么用;
其实时间和信号量比较像;但是信号量不用自己清除标志位;但是事件是需要的;
4. 小白都看懂了,Python 中的线程和进程精讲,建议收藏
目录
众所周知,CPU是计算机的核心,它承担了所有的计算任务。而操作系统是计算机的管理者,是一个大管家,它负责任务的调度,资源的分配和管理,统领整个计算机硬件。应用程序是具有某种功能的程序,程序运行与操作系统之上
在很早的时候计算机并没有线程这个概念,但是随着时代的发展,只用进程来处理程序出现很多的不足。如当一个进程堵塞时,整个程序会停止在堵塞处,并且如果频繁的切换进程,会浪费系统资源。所以线程出现了
线程是能拥有资源和独立运行的最小单位,也是程序执行的最小单位。一个进程可以拥有多个线程,而且属于同一个进程的多个线程间会共享该进行的资源
① 200 多本 Python 电子书(和经典的书籍)应该有
② Python标准库资料(最全中文版)
③ 项目源码(四五十个有趣且可靠的练手项目及源码)
④ Python基础入门、爬虫、网络开发、大数据分析方面的视频(适合小白学习)
⑤ Python学习路线图(告别不入流的学习)
私信我01即可获取大量Python学习资源
进程时一个具有一定功能的程序在一个数据集上的一次动态执行过程。进程由程序,数据集合和进程控制块三部分组成。程序用于描述进程要完成的功能,是控制进程执行的指令集;数据集合是程序在执行时需要的数据和工作区;程序控制块(PCB)包含程序的描述信息和控制信息,是进程存在的唯一标志
在Python中,通过两个标准库 thread 和 Threading 提供对线程的支持, threading 对 thread 进行了封装。 threading 模块中提供了 Thread , Lock , RLOCK , Condition 等组件
在Python中线程和进程的使用就是通过 Thread 这个类。这个类在我们的 thread 和 threading 模块中。我们一般通过 threading 导入
默认情况下,只要在解释器中,如果没有报错,则说明线程可用
守护模式:
现在我们程序代码中,有多个线程, 并且在这个几个线程中都会去 操作同一部分内容,那么如何实现这些数据的共享呢?
这时,可以使用 threading库里面的锁对象 Lock 去保护
Lock 对象的acquire方法 是申请锁
每个线程在操作共享数据对象之前,都应该申请获取操作权,也就是调用该共享数据对象对应的锁对象的acquire方法,如果线程A 执行了 acquire() 方法,别的线程B 已经申请到了这个锁, 并且还没有释放,那么 线程A的代码就在此处 等待 线程B 释放锁,不去执行后面的代码。
直到线程B 执行了锁的 release 方法释放了这个锁, 线程A 才可以获取这个锁,就可以执行下面的代码了
如:
到在使用多线程时,如果数据出现和自己预期不符的问题,就可以考虑是否是共享的数据被调用覆盖的问题
使用 threading 库里面的锁对象 Lock 去保护
Python中的多进程是通过multiprocessing包来实现的,和多线程的threading.Thread差不多,它可以利用multiprocessing.Process对象来创建一个进程对象。这个进程对象的方法和线程对象的方法差不多也有start(), run(), join()等方法,其中有一个方法不同Thread线程对象中的守护线程方法是setDeamon,而Process进程对象的守护进程是通过设置daemon属性来完成的
守护模式:
其使用方法和线程的那个 Lock 使用方法类似
Manager的作用是提供多进程共享的全局变量,Manager()方法会返回一个对象,该对象控制着一个服务进程,该进程中保存的对象运行其他进程使用代理进行操作
语法:
线程池的基类是 concurrent.futures 模块中的 Executor , Executor 提供了两个子类,即 ThreadPoolExecutor 和 ProcessPoolExecutor ,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池
如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定
Exectuor 提供了如下常用方法:
程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表
Future 提供了如下方法:
使用线程池来执行线程任务的步骤如下:
最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
也可以低于 CPU 核心数
使用线程池来执行线程任务的步骤如下:
关于进程的开启代码一定要放在 if __name__ == '__main__': 代码之下,不能放到函数中或其他地方
开启进程的技巧
开启进程的数量最好低于最大 CPU 核心数
5. Python:进程(threading)
这里是自己写下关于 Python 跟进程相关的 threading 模块的一点笔记,跟有些跟 Linux 调用挺像的,有共通之处。
https://docs.python.org/3/library/threading.html?highlight=threading#thread-objects
直接传入
继承 Thread 重写 run 方法
threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
group 线程组,未实现
start() 线程就绪
join([timeout]) 阻塞其他线程,直到调用这方法的进程结束或时间到达
RuntimeError: cannot join thread before it is started
get/setName(name) 获取/设置线程名。
isAlive() 返回线程是否在运行。
is/setDaemon(bool): 获取/设置是后台线程(默认前台线程(False))。(在start之前设置)
The entire Python program exits when no alive non-daemon threads are left.
没有非后台进程运行,Python 就退出。
主线程执行完毕后,后台线程不管是成功与否,主线程均停止
t.start()
t.join()
start() 后 join() 会顺序执行,失去线程意义
https://docs.python.org/3/library/threading.html?#lock-objects
Lock属于全局,Rlock属于线程(R的意思是可重入,线程用Lock的话会死锁,来看例子)
acquire(blocking=True, timeout=-1) 申请锁,返回申请的结果
release() 释放锁,没返回结果
https://docs.python.org/3/library/threading.html#condition-objects
可以在构造时传入rlock lock实例,不然自己生成一个。
acquire([timeout])/release(): 与lock rlock 相同
wait([timeout]): 调用这个方法将使线程进入等待池,并释放锁。调用方法前线程必须已获得锁定,否则将抛出异常。
notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。调用方法前线程必须已获得锁定,否则将抛出异常。
notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
threading.Semaphore(value=1)
https://docs.python.org/3/library/threading.html#semaphore-objects
acquire(blocking=True, timeout=None)
资源数大于0,减一并返回,等于0时等待,blocking为False不阻塞进程
返回值是申请结果
release()
资源数加1
https://docs.python.org/3/library/threading.html#event-objects
事件内置了一个初始为False的标志
is_set() 返回内置标志的状态
set() 设为True
clear() 设为False
wait(timeout=None) 阻塞线程并等待,为真时返回。返回值只会在等待超时时为False,其他情况为True
https://docs.python.org/3/library/threading.html#timer-objects
threading.Timer(interval, function, args=None, kwargs=None)
第一个参数是时间间隔,单位是秒,整数或者浮点数,负数不会报错直接执行不等待
可以用cancel() 取消
https://docs.python.org/3/library/threading.html#barrier-objects
threading.Barrier(parties, action=None, timeout=None)
调用的进程数目达到第一个设置的参数就唤醒全部进程
wait(timeout=None)
reset() 重置,等待中的进程收到 BrokenBarrierError 错误
6. Python多线程总结
在实际处理数据时,因系统内存有限,我们不可能一次把所有数据都导出进行操作,所以需要批量导出依次操作。为了加快运行,我们会采用多线程的方法进行数据处理, 以下为我总结的多线程批量处理数据的模板:
主要分为三大部分:
共分4部分对多线程的内容进行总结。
先为大家介绍线程的相关概念:
在飞车程序中,如果没有多线程,我们就不能一边听歌一边玩飞车,听歌与玩 游戏 不能并行;在使用多线程后,我们就可以在玩 游戏 的同时听背景音乐。在这个例子中启动飞车程序就是一个进程,玩 游戏 和听音乐是两个线程。
Python 提供了 threading 模块来实现多线程:
因为新建线程系统需要分配资源、终止线程系统需要回收资源,所以如果可以重用线程,则可以减去新建/终止的开销以提升性能。同时,使用线程池的语法比自己新建线程执行线程更加简洁。
Python 为我们提供了 ThreadPoolExecutor 来实现线程池,此线程池默认子线程守护。它的适应场景为突发性大量请求或需要大量线程完成任务,但实际任务处理时间较短。
其中 max_workers 为线程池中的线程个数,常用的遍历方法有 map 和 submit+as_completed 。根据业务场景的不同,若我们需要输出结果按遍历顺序返回,我们就用 map 方法,若想谁先完成就返回谁,我们就用 submit+as_complete 方法。
我们把一个时间段内只允许一个线程使用的资源称为临界资源,对临界资源的访问,必须互斥的进行。互斥,也称间接制约关系。线程互斥指当一个线程访问某临界资源时,另一个想要访问该临界资源的线程必须等待。当前访问临界资源的线程访问结束,释放该资源之后,另一个线程才能去访问临界资源。锁的功能就是实现线程互斥。
我把线程互斥比作厕所包间上大号的过程,因为包间里只有一个坑,所以只允许一个人进行大号。当第一个人要上厕所时,会将门上上锁,这时如果第二个人也想大号,那就必须等第一个人上完,将锁解开后才能进行,在这期间第二个人就只能在门外等着。这个过程与代码中使用锁的原理如出一辙,这里的坑就是临界资源。 Python 的 threading 模块引入了锁。 threading 模块提供了 Lock 类,它有如下方法加锁和释放锁:
我们会发现这个程序只会打印“第一道锁”,而且程序既没有终止,也没有继续运行。这是因为 Lock 锁在同一线程内第一次加锁之后还没有释放时,就进行了第二次 acquire 请求,导致无法执行 release ,所以锁永远无法释放,这就是死锁。如果我们使用 RLock 就能正常运行,不会发生死锁的状态。
在主线程中定义 Lock 锁,然后上锁,再创建一个子 线程t 运行 main 函数释放锁,结果正常输出,说明主线程上的锁,可由子线程解锁。
如果把上面的锁改为 RLock 则报错。在实际中设计程序时,我们会将每个功能分别封装成一个函数,每个函数中都可能会有临界区域,所以就需要用到 RLock 。
一句话总结就是 Lock 不能套娃, RLock 可以套娃; Lock 可以由其他线程中的锁进行操作, RLock 只能由本线程进行操作。