文章目录
- 多线程
- 多线程引入
- 2种方式创建线程
- 线程间的通信
- 并发控制-Semaphore
- 线程池
- done()
- result()
- cancel()
- as_completed()
- map()
- wait()
- with语句
多线程
多线程引入
之前我有写过一篇关于 多线程和多进程的文章:
- 31天Python入门——第24天:挑战一口气把多线程和多进程讲明白
没看过的老铁, 可以先阅读上面这篇文章, 这样的话就有了基础, 方便下面内容的学习.
多线程: 适用于 IO 密集型代码
多进程: 适用于 CPU 密集型代码
爬虫, 主要是 网络IO.
GIL 遇到 IO操作的时候, 会释放.
ok, 有了前面的基础, 下面我们看这段 多线程代码:
print('主线程执行代码')# 从 threading 库中导入Thread类fromthreadingimportThreadfromtimeimportsleep# 定义一个函数,作为新线程执行的入口函数defthreadFunc(arg1,arg2):print('子线程 开始')print(f'线程函数参数是:{arg1},{arg2}')sleep(5)print('子线程 结束')# 创建 Thread 类的实例对象thread=Thread(# target 参数 指定 新线程要执行的函数# 注意,这里指定的函数对象只能写一个名字,不能后面加括号,# 如果加括号就是直接在当前线程调用执行,而不是在新线程中执行了target=threadFunc,# 如果 新线程函数需要参数,在 args里面填入参数# 注意参数是元组, 如果只有一个参数,后面要有逗号,像这样 args=('参数1',)args=('参数1','参数2'))# 执行start 方法,就会创建新线程,# 并且新线程会去执行入口函数里面的代码。# 这时候 这个进程 有两个线程了。thread.start()# 主线程的代码执行 子线程对象的join方法,# 就会等待子线程结束,才继续执行下面的代码thread.join()print('主线程结束')这里有两点需要强调说明:
- 创建了 Thread 实例对象之后, 这时新的线程还没有创建, 需要调用 Thread 对象的 start()方法, 这样新的线程才创建成功, 并开始执行 threadFunc 入口函数里面的代码.
- 有的时候, 一个线程需要等待其他线程结束, 比如需要根据其他线程运行结束后的结果进行处理, 这时可以使用 Thread 对象的 join() 方法, 等待对应的线程完成, 才执行后续代码.
好的, 理解了上面的代码之后, 就可以继续后面的学习了.
2种方式创建线程
写爬虫的时候, 有一个简单网站吗里面有 列表页和详情页.
importthreadingdefget_list_html(url):print('开始获取列表页')time.sleep(2)# 模拟网络请求.print('已经获取到获取列表页')defget_detail_html(url):print('开始获取详情页')time.sleep(2)# 模拟网络请求.print('已经获取到获取详情页')url='www.baidu.com't1=threading.Thread(target=get_list_html,args=(url,))t2=threading.Thread(target=get_detail_html,args=(url,))t1.start()t2.start()t1.join()t2.join()运行结果是:
开始获取列表页 开始获取详情页 已经获取到列表页 已经获取到详情页这其实是 函数式创建线程的方式, 还有一种 面向对象式 通过继承 threading.Thread 类并重写 run()方法的方式.
ok, 下面我们用面向对象式的方式对上面的代码进行改写:
importthreadingimporttimeclassListHtmlThread(threading.Thread):def__init__(self,url):super().__init__()self.url=url# 保存参数到实例属性defrun(self):print('开始获取列表页')time.sleep(2)# 模拟网络请求print('已经获取到列表页')classDetailHtmlThread(threading.Thread):def__init__(self,url):super().__init__()self.url=urldefrun(self):print('开始获取详情页')time.sleep(2)# 模拟网络请求print('已经获取到详情页')# 使用url='www.baidu.com't1=ListHtmlThread(url)t2=DetailHtmlThread(url)t1.start()t2.start()t1.join()t2.join()聪明的老铁可能已经发现了, 其实 run()方法 就是在线程中运行的代码体, 也就相当于 函数式创建线程时的 传给target=的那个函数.
之前说过 daemon线程 的概念(也叫守护线程), 前面函数式代码运行也是一样, 主线程得等2秒后子线程结束, 主线程才能结束.
这是因为:
Python 程序中当所有的 非daemon线程 都结束了, 整个程序才会结束.
主线程是非daemon线程, 启动的子线程缺省也是 非daemon线程.
所以, 要等到主线程和子线程都结束, 程序才会结束.
我们可以在创建子线程的时候, 设置daemon参数值为 True, 比如之前的代码改写:
# 方式一: 在定义实例对象的时候设置 daemon=Truethread=Thread(target=threadFunc,daemon=True# 设置新线程为daemon线程)# 方式二: 在线程启动前调用 .setDeamon(True)方法thread=Thread(target=threadFunc)thread.setDaemon(True)# 需要注意的点: 守护线程要在线程启动之前设置thread.start()好的, 理解了上面函数式的, 那么面向对象式的就很简单了, 只要这样:
...t1=ListHtmlThread(url)t2=DetailHtmlThread(url)t3.setDaemon(True)t4.setDaemon(True)t1.start()t2.start()线程间的通信
在之前的文章中我们有说过 通过锁机制来对共享数据的访问控制, 忘记的老铁可以再回顾一下:
31天Python入门——第24天:挑战一口气把多线程和多进程讲明白
好的, 现在我们知道:
在多线程环境中, 所有线程共享同一个进程的内存空间. 但是由于线程切换得非常快, 因此多个线程同时访问和修改共享资源时, 很容易出现数据竞争问题, 这就可能发生数据不一致的错误.
为了避免这种问题, 需要使用锁或其他的同步机制 来确保同一时间只有一个线程可以访问共享资源.
接下来, 我们使用本身就是线程安全的数据结构 Queue, 通过它来代替共享数据的访问.
ok, 下面有一个生产者消费者模型 (生产者线程在生产, 消费者线程在消费) :
importthreadingimporttimefromqueueimportQueue,Empty# 通过安全的队列来通信.list_status=False# 生产者classGetListHtml(threading.Thread):def__init__(self,url_queue:Queue):super(GetListHtml,self).__init__()self.url_queue=url_queuedefrun(self):print('开始获取列表页')try:foriinrange(10):time.sleep(0.5)# 模拟网络请求.url=f'www.yrx.com/{i}'# Queue.join()里有一个计数器, 这一个计数器会在put的时候加1. 在每一次task_done的时候减1self.url_queue.put(url)print('已经获取到获取列表页')finally:globallist_status list_status=True# 消费者classGetDetailHtml(threading.Thread):def__init__(self,url_queue:Queue):super(GetDetailHtml,self).__init__()self.url_queue=url_queuedefrun(self):print('开始获取详情页')whileTrue:try:url=self.url_queue.get(timeout=1)# Queue.join()里有一个计数器, 这一个计数器会在put的时候加1. 在每一次task_done的时候减1self.url_queue.task_done()print(f'已经获取到获取详情页-{url}')time.sleep(1)# 模拟网络请求.# 当消费者消费比生产者快的时候,Queue里面的一定会出现为空的情况, 这时候消费者再来消费会出现出现Empty异常exceptEmpty:# 想一个办法, 让程序知道生产者已经结束了.globallist_statusprint(list_status)iflist_status:print('详情页已抓完')breakif__name__=='__main__':start_time=time.time()url_queue=Queue(100)# 一个生产者t1=GetListHtml(url_queue)# 两个消费者t2=GetDetailHtml(url_queue)t3=GetDetailHtml(url_queue)# 创建线程并启动t1.start()t2.start()t3.start()# 主线程阻塞等待子线程执行完毕t1.join()t2.join()t3.join()# join: 阻塞队列. 在主线程阻塞.# join里有一个计数器, 这一个计数器会在put的时候加1. 在每一次task_done的时候减1.# 当这个计数器是0的时候. 它会释放这个阻塞.url_queue.join()print(time.time()-start_time)运行结果是:
开始获取列表页 开始获取详情页 开始获取详情页 已经获取到获取列表页 已经获取到获取详情页-www.yrx.com/0 False 已经获取到获取列表页 已经获取到获取详情页-www.yrx.com/1 已经获取到获取列表页 已经获取到获取详情页-www.yrx.com/2 已经获取到获取列表页已经获取到获取详情页-www.yrx.com/3 已经获取到获取列表页已经获取到获取详情页-www.yrx.com/4 已经获取到获取列表页 已经获取到获取详情页-www.yrx.com/5 已经获取到获取列表页已经获取到获取详情页-www.yrx.com/6 已经获取到获取列表页 已经获取到获取详情页-www.yrx.com/7 已经获取到获取列表页已经获取到获取详情页-www.yrx.com/8 已经获取到获取列表页已经获取到获取详情页-www.yrx.com/9 True 详情页已抓完 True 详情页已抓完 7.050168991088867并发控制-Semaphore
先看下面的代码:
importthreadingimporttimeclassGetHtml(threading.Thread):def__init__(self,url):super().__init__()self.url=urldefrun(self):time.sleep(2)print(f'获取详情页成功:{self.url}')classSendUrl(threading.Thread):def__init__(self):super().__init__()defrun(self):foriinrange(30):url='www.yrx.com/{}'.format(i)get_html_thread=GetHtml(url=url)get_html_thread.start()if__name__=='__main__':send_url=SendUrl()send_url.start()运行结果是:
...... 获取详情页成功: www.yrx.com/9 获取详情页成功: www.yrx.com/11 获取详情页成功: www.yrx.com/12 获取详情页成功: www.yrx.com/19 获取详情页成功: www.yrx.com/23 获取详情页成功: www.yrx.com/22 获取详情页成功: www.yrx.com/28 获取详情页成功: www.yrx.com/29如果 for i in range(30): 创建的线程数更多呢, 比如是 30000, 那么代码是有问题的, 系统很可能因为不会创建这么多线程而出现异常错误, 所以这个时候, 我们就需要使用 Semaphore 对并发数进行控制.
好的, 接下来我们使用 Semaphore 来控制并发.
改写代码如下:
# Semaphore, 用于控制并发, 锁.importthreadingimporttimeclassGetHtml(threading.Thread):def__init__(self,url,semaphore):super().__init__()self.url=url self.semaphore=semaphoredefrun(self):time.sleep(2)print(f'获取详情页成功:{self.url}')# 释放锁的位置self.semaphore.release()classSendUrl(threading.Thread):def__init__(self,semaphore:threading.Semaphore):super().__init__()self.semaphore=semaphoredefrun(self):foriinrange(30000):url='www.yrx.com/{}'.format(i)# 加锁的位置self.semaphore.acquire()get_html_thread=GetHtml(url=url,semaphore=self.semaphore)get_html_thread.start()if__name__=='__main__':# 使用 Semaphore 来控制并发数每次是5个线程semaphore=threading.Semaphore(5)send_url=SendUrl(semaphore)send_url.start()使用 threading.Semaphore(5) 对代码改写了之后, 即使在循环里设置了30000个线程, 每次并发数都是5个.
线程池
前面已经说了多线程, 为什么还需要线程池呢?
这是因为多线程在一些场景下有不足的地方, 我们先来看看 线程的执行顺序:
- 调用 start() 新建线程 ——> 准备就绪状态 ——> 等待操作系统的调度, CPU资源 (消耗资源, 系统资源, CPU, 内存)
- 遇到 IO 操作时, GIL会释放, 线程会切换, 这样的话又会执行上面的(调用start()新建线程…)
- 调用 run()方法执行完线程里的代码 (回收资源)
这样一来, 创建线程数很多的话, 频繁消耗资源、回收资源导致效率很低, 如果想要避免上面的问题, 可以看看下面线程池的实现.
首先, 线程池的好处是:
- 提升性能, 重用资源, 有效地避免了创建线程过多.
- 主线程中可以获取到某1个线程的返回值, 还可以获取线程的运行状态.
- 接口一致 (线程、进程和协程使用了一样的设计模式 .Future), 学习成本更低, 代码维护成本低.
ok, 有了上面的概念, 我们来看看这段线程池的基础代码:
importtimefromconcurrent.futuresimportThreadPoolExecutordeftask_cost_time(times):time.sleep(times)print(f'程序花费了{times}秒')returntimes executor=ThreadPoolExecutor(3)task1=executor.submit(task_cost_time,3)task2=executor.submit(task_cost_time,2)我们直接运行上面的代码, 会得到:
程序花费了2秒 程序花费了3秒需要注意的是, submit()方法是里是自动执行start()启动线程的, submit()方法的返回值是 Future 对象, 并且 submit()方法是不阻塞的. 也就是说, 主线程并不会 join 等待上面的两个线程执行结束才会执行后面的代码.
done()
我们使用 done()方法来判断线程是否执行完毕:
...print(task1.done())time.sleep(3.1)print(task1.done())运行结果是:
False 程序花费了2秒 程序花费了3秒 Trueresult()
主线程使用 result()方法 可以获取到某一个线程的返回值.
...print(111111111)print(task1.result())print(222222222)运行结果是:
111111111 程序花费了2秒 程序花费了3秒 3 222222222我们看到只有执行了 print(task1.result()) 得到了返回值3, 才打印 222222222, 也就是说, result()方法是阻塞的.
cancel()
使用 cancel()方法可以取消线程, 但是处于正在运行状态或者已完成的线程, 不允许取消.
我们将前面的代码改写:
...deftask_cost_time(times):time.sleep(times)print(f'程序花费了{times}秒')returntimes executor=ThreadPoolExecutor(1)task1=executor.submit(task_cost_time,3)task2=executor.submit(task_cost_time,2)print(task1.cancel())print(task2.cancel())运行结果是:
False True 程序花费了3秒此时并没有执行 task2, 它被取消运行了.
我们看一下 cancel()方法的源码, 可以看出:
处于正在运行状态或者已完成的线程, 不允许取消.
defcancel(self):"""Cancel the future if possible. Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed. """withself._condition:ifself._statein[RUNNING,FINISHED]:returnFalseifself._statein[CANCELLED,CANCELLED_AND_NOTIFIED]:returnTrueself._state=CANCELLED self._condition.notify_all()self._invoke_callbacks()returnTrueas_completed()
as_completed()方法返回的是生成器(特殊的迭代器).
importthreadingimporttimefromconcurrent.futuresimportThreadPoolExecutor,as_completeddeftask_cost_time(times):time.sleep(times)print(f'程序花费了{times}秒')returntimes executor=ThreadPoolExecutor(3)time_list=[3,2,4]# tasks: 一个Future的列表.tasks=[executor.submit(task_cost_time,t)fortintime_list]# as_completed, fs就是Futuresforfinas_completed(tasks):print(f.result())运行结果是:
程序花费了2秒 2 程序花费了3秒 3 程序花费了4秒 4我们发现 as_completed()方法 先请求好的先给出结果.
map()
map()方法和 as_completed()方法的功能很相似, 但是map()方法不是先请求好的先给出结果, 而是按照生成的 Future列表中的Future对象的顺序执行.
importthreadingimporttimefromconcurrent.futuresimportThreadPoolExecutordeftask_cost_time(times):time.sleep(times)print(f'程序花费了{times}秒')returntimes executor=ThreadPoolExecutor(3)time_list=[3,2,4]tasks=executor.map(task_cost_time,time_list)forfintasks:print(f)运行结果是:
程序花费了2秒 程序花费了3秒 3 2 程序花费了4秒 4map()方法的源码是:
defmap(self,fn,*iterables,timeout=None,chunksize=1):iftimeoutisnotNone:end_time=timeout+time.monotonic()fs=[self.submit(fn,*args)forargsinzip(*iterables)]# Yield must be hidden in closure so that the futures are submitted# before the first iterator value is required.defresult_iterator():try:# reverse to keep finishing orderfs.reverse()whilefs:# Careful not to keep a reference to the popped futureiftimeoutisNone:yieldfs.pop().result()else:yieldfs.pop().result(end_time-time.monotonic())finally:forfutureinfs:future.cancel()returnresult_iterator()其中 fs.reverse() 是将 Future列表中的Future对象反转过来, 这样在执行后面的 yield fs.pop().result() 代码时, 会按照生成的 Future列表中的Future对象的顺序执行(因为前面说过result()方法是阻塞执行的).
wait()
如果我们想要实现类似于之前的代码中 join()方法的功能, 比如主线程要等到子线程全部执行完毕之后, 才能执行主线程后面的代码.
这个时候可以使用 wait()方法:
importtimefromconcurrent.futuresimportThreadPoolExecutor,waitdeftask_cost_time(times):time.sleep(times)print(f'程序花费了{times}秒')returntimes executor=ThreadPoolExecutor(2)task1=executor.submit(task_cost_time,3)task2=executor.submit(task_cost_time,2)wait([task1,task2])print(111111)运行结果是:
程序花费了2秒 程序花费了3秒 111111我们来看看 wait()方法的定义:
defwait(fs,timeout=None,return_when=ALL_COMPLETED):缺省情况下 return_when的值是ALL_COMPLETED, 也就是在所有的线程都执行完毕后才返回.
如果想要实现完成了一个线程就返回, 只需要这样实现:
...executor=ThreadPoolExecutor(2)task1=executor.submit(task_cost_time,3)task2=executor.submit(task_cost_time,2)wait([task1,task2],return_when=FIRST_COMPLETED)print(111111)运行结果是:
程序花费了2秒 111111 程序花费了3秒wait([task1, task2], return_when=FIRST_COMPLETED) 这个时候只要完成了一个线程就会返回.
with语句
之前有在Python魔法方法一文中的__enter__和__exit__方法里讲到这块的知识, 老铁们可以回顾一下:
- 31天Python入门——第20天:魔法方法详解
with语句, 上下文管理器.
当一个类里定义了 enter方法和exit方法, 那么这个类就可以被叫做上下文管理器, 它遵循Python中的上下文管理器协议.
__enter__(self)方法:
在进入with语句代码块时被调用, 用于执行一些准备操作.- 可以返回一个值, 该值将被赋给
as关键字之后的变量.
__exit__(self, exc_type, exc_value, exc_traceback)方法:
- 在退出with语句代码块时调用, 无论代码块是否发生异常.
- 可以用于执行一些清理操作, 如资源的释放.
- 如果代码块内没有异常发生, 参数值为 None, None, None.如果有异常, 参数包含异常类型、异常实例和跟踪信息.
- 如果
__exit__方法返回 True, 异常不会向上继续传播.返回 False 则异常会继续向上抛.
比如这段代码:
classMyRequest:def__enter__(self):print(11111111)returnselfdef__exit__(self,exc_type,exc_val,exc_tb):passwithMyRequest()asmr:print(mr)运行结果是:
11111111 <__main__.MyRequest object at 0x7faed014c160>也就是:
在进入with语句代码块时执行enter()方法, 打印11111111, 并返回了实例对象, 而且这个返回值会被赋给as关键字之后的变量mr.
如果with语句代码块中有异常错误:
classMyRequest:def__enter__(self):returnselfdef__exit__(self,exc_type,exc_val,exc_tb):print(2222222)# return TruewithMyRequest()asmr:1/0运行结果是:
2222222 ZeroDivisionError: division by zero如果不想再向上抛出异常, 只需要在 exit()方法中返回True.