news 2026/4/23 11:44:18

「深入理解多线程编程」再谈线程

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
「深入理解多线程编程」再谈线程

你好,我是安然无虞。
和我一起,为高质量人生而不懈奋斗。

文章目录

  • 多线程
    • 多线程引入
    • 2种方式创建线程
    • 线程间的通信
    • 并发控制-Semaphore
  • 线程池
    • done()
    • result()
    • cancel()
    • as_completed()
    • map()
    • wait()
    • with语句

多线程

多线程引入

之前我有写过一篇关于 多线程和多进程的文章:

  • 31天Python入门——第24天:挑战一口气把多线程和多进程讲明白

没看过的老铁, 可以先阅读上面这篇文章, 这样的话就有了基础, 方便下面内容的学习.

  1. 多线程: 适用于 IO 密集型代码

  2. 多进程: 适用于 CPU 密集型代码

爬虫, 主要是 网络IO.

GIL 遇到 IO操作的时候, 会释放.

ok, 有了前面的基础, 下面我们看这段 多线程代码:

print('主线程执行代码')# 从 threading 库中导入Thread类fromthreadingimportThreadfromtimeimportsleep# 定义一个函数,作为新线程执行的入口函数defthreadFunc(arg1,arg2):print('子线程 开始')print(f'线程函数参数是:{arg1},{arg2}')sleep(5)print('子线程 结束')# 创建 Thread 类的实例对象thread=Thread(# target 参数 指定 新线程要执行的函数# 注意,这里指定的函数对象只能写一个名字,不能后面加括号,# 如果加括号就是直接在当前线程调用执行,而不是在新线程中执行了target=threadFunc,# 如果 新线程函数需要参数,在 args里面填入参数# 注意参数是元组, 如果只有一个参数,后面要有逗号,像这样 args=('参数1',)args=('参数1','参数2'))# 执行start 方法,就会创建新线程,# 并且新线程会去执行入口函数里面的代码。# 这时候 这个进程 有两个线程了。thread.start()# 主线程的代码执行 子线程对象的join方法,# 就会等待子线程结束,才继续执行下面的代码thread.join()print('主线程结束')

这里有两点需要强调说明:

  1. 创建了 Thread 实例对象之后, 这时新的线程还没有创建, 需要调用 Thread 对象的 start()方法, 这样新的线程才创建成功, 并开始执行 threadFunc 入口函数里面的代码.
  2. 有的时候, 一个线程需要等待其他线程结束, 比如需要根据其他线程运行结束后的结果进行处理, 这时可以使用 Thread 对象的 join() 方法, 等待对应的线程完成, 才执行后续代码.

好的, 理解了上面的代码之后, 就可以继续后面的学习了.

2种方式创建线程

写爬虫的时候, 有一个简单网站吗里面有 列表页和详情页.

importthreadingdefget_list_html(url):print('开始获取列表页')time.sleep(2)# 模拟网络请求.print('已经获取到获取列表页')defget_detail_html(url):print('开始获取详情页')time.sleep(2)# 模拟网络请求.print('已经获取到获取详情页')url='www.baidu.com't1=threading.Thread(target=get_list_html,args=(url,))t2=threading.Thread(target=get_detail_html,args=(url,))t1.start()t2.start()t1.join()t2.join()

运行结果是:

开始获取列表页 开始获取详情页 已经获取到列表页 已经获取到详情页

这其实是 函数式创建线程的方式, 还有一种 面向对象式 通过继承 threading.Thread 类并重写 run()方法的方式.

ok, 下面我们用面向对象式的方式对上面的代码进行改写:

importthreadingimporttimeclassListHtmlThread(threading.Thread):def__init__(self,url):super().__init__()self.url=url# 保存参数到实例属性defrun(self):print('开始获取列表页')time.sleep(2)# 模拟网络请求print('已经获取到列表页')classDetailHtmlThread(threading.Thread):def__init__(self,url):super().__init__()self.url=urldefrun(self):print('开始获取详情页')time.sleep(2)# 模拟网络请求print('已经获取到详情页')# 使用url='www.baidu.com't1=ListHtmlThread(url)t2=DetailHtmlThread(url)t1.start()t2.start()t1.join()t2.join()

聪明的老铁可能已经发现了, 其实 run()方法 就是在线程中运行的代码体, 也就相当于 函数式创建线程时的 传给target=的那个函数.

之前说过 daemon线程 的概念(也叫守护线程), 前面函数式代码运行也是一样, 主线程得等2秒后子线程结束, 主线程才能结束.

这是因为:

Python 程序中当所有的 非daemon线程 都结束了, 整个程序才会结束.

主线程是非daemon线程, 启动的子线程缺省也是 非daemon线程.

所以, 要等到主线程和子线程都结束, 程序才会结束.

我们可以在创建子线程的时候, 设置daemon参数值为 True, 比如之前的代码改写:

# 方式一: 在定义实例对象的时候设置 daemon=Truethread=Thread(target=threadFunc,daemon=True# 设置新线程为daemon线程)# 方式二: 在线程启动前调用 .setDeamon(True)方法thread=Thread(target=threadFunc)thread.setDaemon(True)# 需要注意的点: 守护线程要在线程启动之前设置thread.start()

好的, 理解了上面函数式的, 那么面向对象式的就很简单了, 只要这样:

...t1=ListHtmlThread(url)t2=DetailHtmlThread(url)t3.setDaemon(True)t4.setDaemon(True)t1.start()t2.start()

线程间的通信

在之前的文章中我们有说过 通过锁机制来对共享数据的访问控制, 忘记的老铁可以再回顾一下:

31天Python入门——第24天:挑战一口气把多线程和多进程讲明白

好的, 现在我们知道:

在多线程环境中, 所有线程共享同一个进程的内存空间. 但是由于线程切换得非常快, 因此多个线程同时访问和修改共享资源时, 很容易出现数据竞争问题, 这就可能发生数据不一致的错误.

为了避免这种问题, 需要使用锁或其他的同步机制 来确保同一时间只有一个线程可以访问共享资源.

接下来, 我们使用本身就是线程安全的数据结构 Queue, 通过它来代替共享数据的访问.

ok, 下面有一个生产者消费者模型 (生产者线程在生产, 消费者线程在消费) :

importthreadingimporttimefromqueueimportQueue,Empty# 通过安全的队列来通信.list_status=False# 生产者classGetListHtml(threading.Thread):def__init__(self,url_queue:Queue):super(GetListHtml,self).__init__()self.url_queue=url_queuedefrun(self):print('开始获取列表页')try:foriinrange(10):time.sleep(0.5)# 模拟网络请求.url=f'www.yrx.com/{i}'# Queue.join()里有一个计数器, 这一个计数器会在put的时候加1. 在每一次task_done的时候减1self.url_queue.put(url)print('已经获取到获取列表页')finally:globallist_status list_status=True# 消费者classGetDetailHtml(threading.Thread):def__init__(self,url_queue:Queue):super(GetDetailHtml,self).__init__()self.url_queue=url_queuedefrun(self):print('开始获取详情页')whileTrue:try:url=self.url_queue.get(timeout=1)# Queue.join()里有一个计数器, 这一个计数器会在put的时候加1. 在每一次task_done的时候减1self.url_queue.task_done()print(f'已经获取到获取详情页-{url}')time.sleep(1)# 模拟网络请求.# 当消费者消费比生产者快的时候,Queue里面的一定会出现为空的情况, 这时候消费者再来消费会出现出现Empty异常exceptEmpty:# 想一个办法, 让程序知道生产者已经结束了.globallist_statusprint(list_status)iflist_status:print('详情页已抓完')breakif__name__=='__main__':start_time=time.time()url_queue=Queue(100)# 一个生产者t1=GetListHtml(url_queue)# 两个消费者t2=GetDetailHtml(url_queue)t3=GetDetailHtml(url_queue)# 创建线程并启动t1.start()t2.start()t3.start()# 主线程阻塞等待子线程执行完毕t1.join()t2.join()t3.join()# join: 阻塞队列. 在主线程阻塞.# join里有一个计数器, 这一个计数器会在put的时候加1. 在每一次task_done的时候减1.# 当这个计数器是0的时候. 它会释放这个阻塞.url_queue.join()print(time.time()-start_time)

运行结果是:

开始获取列表页 开始获取详情页 开始获取详情页 已经获取到获取列表页 已经获取到获取详情页-www.yrx.com/0 False 已经获取到获取列表页 已经获取到获取详情页-www.yrx.com/1 已经获取到获取列表页 已经获取到获取详情页-www.yrx.com/2 已经获取到获取列表页已经获取到获取详情页-www.yrx.com/3 已经获取到获取列表页已经获取到获取详情页-www.yrx.com/4 已经获取到获取列表页 已经获取到获取详情页-www.yrx.com/5 已经获取到获取列表页已经获取到获取详情页-www.yrx.com/6 已经获取到获取列表页 已经获取到获取详情页-www.yrx.com/7 已经获取到获取列表页已经获取到获取详情页-www.yrx.com/8 已经获取到获取列表页已经获取到获取详情页-www.yrx.com/9 True 详情页已抓完 True 详情页已抓完 7.050168991088867

并发控制-Semaphore

先看下面的代码:

importthreadingimporttimeclassGetHtml(threading.Thread):def__init__(self,url):super().__init__()self.url=urldefrun(self):time.sleep(2)print(f'获取详情页成功:{self.url}')classSendUrl(threading.Thread):def__init__(self):super().__init__()defrun(self):foriinrange(30):url='www.yrx.com/{}'.format(i)get_html_thread=GetHtml(url=url)get_html_thread.start()if__name__=='__main__':send_url=SendUrl()send_url.start()

运行结果是:

...... 获取详情页成功: www.yrx.com/9 获取详情页成功: www.yrx.com/11 获取详情页成功: www.yrx.com/12 获取详情页成功: www.yrx.com/19 获取详情页成功: www.yrx.com/23 获取详情页成功: www.yrx.com/22 获取详情页成功: www.yrx.com/28 获取详情页成功: www.yrx.com/29

如果 for i in range(30): 创建的线程数更多呢, 比如是 30000, 那么代码是有问题的, 系统很可能因为不会创建这么多线程而出现异常错误, 所以这个时候, 我们就需要使用 Semaphore 对并发数进行控制.

好的, 接下来我们使用 Semaphore 来控制并发.

改写代码如下:

# Semaphore, 用于控制并发, 锁.importthreadingimporttimeclassGetHtml(threading.Thread):def__init__(self,url,semaphore):super().__init__()self.url=url self.semaphore=semaphoredefrun(self):time.sleep(2)print(f'获取详情页成功:{self.url}')# 释放锁的位置self.semaphore.release()classSendUrl(threading.Thread):def__init__(self,semaphore:threading.Semaphore):super().__init__()self.semaphore=semaphoredefrun(self):foriinrange(30000):url='www.yrx.com/{}'.format(i)# 加锁的位置self.semaphore.acquire()get_html_thread=GetHtml(url=url,semaphore=self.semaphore)get_html_thread.start()if__name__=='__main__':# 使用 Semaphore 来控制并发数每次是5个线程semaphore=threading.Semaphore(5)send_url=SendUrl(semaphore)send_url.start()

使用 threading.Semaphore(5) 对代码改写了之后, 即使在循环里设置了30000个线程, 每次并发数都是5个.

线程池

前面已经说了多线程, 为什么还需要线程池呢?

这是因为多线程在一些场景下有不足的地方, 我们先来看看 线程的执行顺序:

  1. 调用 start() 新建线程 ——> 准备就绪状态 ——> 等待操作系统的调度, CPU资源 (消耗资源, 系统资源, CPU, 内存)
  2. 遇到 IO 操作时, GIL会释放, 线程会切换, 这样的话又会执行上面的(调用start()新建线程…)
  3. 调用 run()方法执行完线程里的代码 (回收资源)

这样一来, 创建线程数很多的话, 频繁消耗资源、回收资源导致效率很低, 如果想要避免上面的问题, 可以看看下面线程池的实现.

首先, 线程池的好处是:

  1. 提升性能, 重用资源, 有效地避免了创建线程过多.
  2. 主线程中可以获取到某1个线程的返回值, 还可以获取线程的运行状态.
  3. 接口一致 (线程、进程和协程使用了一样的设计模式 .Future), 学习成本更低, 代码维护成本低.

ok, 有了上面的概念, 我们来看看这段线程池的基础代码:

importtimefromconcurrent.futuresimportThreadPoolExecutordeftask_cost_time(times):time.sleep(times)print(f'程序花费了{times}秒')returntimes executor=ThreadPoolExecutor(3)task1=executor.submit(task_cost_time,3)task2=executor.submit(task_cost_time,2)

我们直接运行上面的代码, 会得到:

程序花费了2秒 程序花费了3秒

需要注意的是, submit()方法是里是自动执行start()启动线程的, submit()方法的返回值是 Future 对象, 并且 submit()方法是不阻塞的. 也就是说, 主线程并不会 join 等待上面的两个线程执行结束才会执行后面的代码.

done()

我们使用 done()方法来判断线程是否执行完毕:

...print(task1.done())time.sleep(3.1)print(task1.done())

运行结果是:

False 程序花费了2秒 程序花费了3秒 True

result()

主线程使用 result()方法 可以获取到某一个线程的返回值.

...print(111111111)print(task1.result())print(222222222)

运行结果是:

111111111 程序花费了2秒 程序花费了3秒 3 222222222

我们看到只有执行了 print(task1.result()) 得到了返回值3, 才打印 222222222, 也就是说, result()方法是阻塞的.

cancel()

使用 cancel()方法可以取消线程, 但是处于正在运行状态或者已完成的线程, 不允许取消.

我们将前面的代码改写:

...deftask_cost_time(times):time.sleep(times)print(f'程序花费了{times}秒')returntimes executor=ThreadPoolExecutor(1)task1=executor.submit(task_cost_time,3)task2=executor.submit(task_cost_time,2)print(task1.cancel())print(task2.cancel())

运行结果是:

False True 程序花费了3秒

此时并没有执行 task2, 它被取消运行了.

我们看一下 cancel()方法的源码, 可以看出:

处于正在运行状态或者已完成的线程, 不允许取消.

defcancel(self):"""Cancel the future if possible. Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed. """withself._condition:ifself._statein[RUNNING,FINISHED]:returnFalseifself._statein[CANCELLED,CANCELLED_AND_NOTIFIED]:returnTrueself._state=CANCELLED self._condition.notify_all()self._invoke_callbacks()returnTrue

as_completed()

as_completed()方法返回的是生成器(特殊的迭代器).

importthreadingimporttimefromconcurrent.futuresimportThreadPoolExecutor,as_completeddeftask_cost_time(times):time.sleep(times)print(f'程序花费了{times}秒')returntimes executor=ThreadPoolExecutor(3)time_list=[3,2,4]# tasks: 一个Future的列表.tasks=[executor.submit(task_cost_time,t)fortintime_list]# as_completed, fs就是Futuresforfinas_completed(tasks):print(f.result())

运行结果是:

程序花费了2秒 2 程序花费了3秒 3 程序花费了4秒 4

我们发现 as_completed()方法 先请求好的先给出结果.

map()

map()方法和 as_completed()方法的功能很相似, 但是map()方法不是先请求好的先给出结果, 而是按照生成的 Future列表中的Future对象的顺序执行.

importthreadingimporttimefromconcurrent.futuresimportThreadPoolExecutordeftask_cost_time(times):time.sleep(times)print(f'程序花费了{times}秒')returntimes executor=ThreadPoolExecutor(3)time_list=[3,2,4]tasks=executor.map(task_cost_time,time_list)forfintasks:print(f)

运行结果是:

程序花费了2秒 程序花费了3秒 3 2 程序花费了4秒 4

map()方法的源码是:

defmap(self,fn,*iterables,timeout=None,chunksize=1):iftimeoutisnotNone:end_time=timeout+time.monotonic()fs=[self.submit(fn,*args)forargsinzip(*iterables)]# Yield must be hidden in closure so that the futures are submitted# before the first iterator value is required.defresult_iterator():try:# reverse to keep finishing orderfs.reverse()whilefs:# Careful not to keep a reference to the popped futureiftimeoutisNone:yieldfs.pop().result()else:yieldfs.pop().result(end_time-time.monotonic())finally:forfutureinfs:future.cancel()returnresult_iterator()

其中 fs.reverse() 是将 Future列表中的Future对象反转过来, 这样在执行后面的 yield fs.pop().result() 代码时, 会按照生成的 Future列表中的Future对象的顺序执行(因为前面说过result()方法是阻塞执行的).

wait()

如果我们想要实现类似于之前的代码中 join()方法的功能, 比如主线程要等到子线程全部执行完毕之后, 才能执行主线程后面的代码.

这个时候可以使用 wait()方法:

importtimefromconcurrent.futuresimportThreadPoolExecutor,waitdeftask_cost_time(times):time.sleep(times)print(f'程序花费了{times}秒')returntimes executor=ThreadPoolExecutor(2)task1=executor.submit(task_cost_time,3)task2=executor.submit(task_cost_time,2)wait([task1,task2])print(111111)

运行结果是:

程序花费了2秒 程序花费了3秒 111111

我们来看看 wait()方法的定义:

defwait(fs,timeout=None,return_when=ALL_COMPLETED):

缺省情况下 return_when的值是ALL_COMPLETED, 也就是在所有的线程都执行完毕后才返回.

如果想要实现完成了一个线程就返回, 只需要这样实现:

...executor=ThreadPoolExecutor(2)task1=executor.submit(task_cost_time,3)task2=executor.submit(task_cost_time,2)wait([task1,task2],return_when=FIRST_COMPLETED)print(111111)

运行结果是:

程序花费了2秒 111111 程序花费了3秒

wait([task1, task2], return_when=FIRST_COMPLETED) 这个时候只要完成了一个线程就会返回.

with语句

之前有在Python魔法方法一文中的__enter____exit__方法里讲到这块的知识, 老铁们可以回顾一下:

  • 31天Python入门——第20天:魔法方法详解

with语句, 上下文管理器.

当一个类里定义了 enter方法和exit方法, 那么这个类就可以被叫做上下文管理器, 它遵循Python中的上下文管理器协议.

  1. __enter__(self)方法:
  • 在进入with语句代码块时被调用, 用于执行一些准备操作.
  • 可以返回一个值, 该值将被赋给as关键字之后的变量.
  1. __exit__(self, exc_type, exc_value, exc_traceback)方法:
  • 在退出with语句代码块时调用, 无论代码块是否发生异常.
  • 可以用于执行一些清理操作, 如资源的释放.
  • 如果代码块内没有异常发生, 参数值为 None, None, None.如果有异常, 参数包含异常类型、异常实例和跟踪信息.
  • 如果__exit__方法返回 True, 异常不会向上继续传播.返回 False 则异常会继续向上抛.

比如这段代码:

classMyRequest:def__enter__(self):print(11111111)returnselfdef__exit__(self,exc_type,exc_val,exc_tb):passwithMyRequest()asmr:print(mr)

运行结果是:

11111111 <__main__.MyRequest object at 0x7faed014c160>

也就是:

在进入with语句代码块时执行enter()方法, 打印11111111, 并返回了实例对象, 而且这个返回值会被赋给as关键字之后的变量mr.

如果with语句代码块中有异常错误:

classMyRequest:def__enter__(self):returnselfdef__exit__(self,exc_type,exc_val,exc_tb):print(2222222)# return TruewithMyRequest()asmr:1/0

运行结果是:

2222222 ZeroDivisionError: division by zero

如果不想再向上抛出异常, 只需要在 exit()方法中返回True.

遇见安然遇见你,不负代码不负卿。
谢谢老铁的时间,咱们下篇再见~
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/21 15:13:38

用 Python 批量采集清洗电商数据,副业月入 2 w+ 的完整流程

0. 序章:写代码是加法,搞数据是乘法 很多程序员有一个误区:以为“副业”就是下班后接外包。 错了。接外包本质上还是出卖时间,还是在打工。真正的副业,是构建一套 24 小时自动运行的系统,让系统为你打工。 电商数据,就是当下最大的金矿。 跨境卖家需要选品数据:亚马逊…

作者头像 李华
网站建设 2026/4/18 7:44:12

Node.js用assert.strict做严格断言的实用技巧

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 目录Node.js中使用assert.strict进行严格断言的实用技巧与深度解析 为何“严格”是现代JavaScript测试的刚需&#xff1f; 核心A…

作者头像 李华
网站建设 2026/4/23 11:10:01

‌CI/CD中的“测试优先级调度”:先跑高风险用例

测试优先级调度的核心价值‌ 在持续集成/持续交付&#xff08;CI/CD&#xff09;流程中&#xff0c;测试是确保软件质量的核心闸门。然而&#xff0c;随着系统复杂度增加&#xff0c;全量测试往往耗时冗长&#xff0c;导致反馈延迟和发布瓶颈。测试优先级调度应运而生——它通…

作者头像 李华
网站建设 2026/4/17 17:31:19

‌CI/CD中的“测试环境版本管理”:和代码版本对齐

版本对齐不是技术选型问题&#xff0c;而是质量生命线‌ 在现代CI/CD流水线中&#xff0c;‌测试环境的版本必须与代码提交哈希&#xff08;Git Commit Hash&#xff09;严格绑定‌&#xff0c;任何偏离都将导致“测试漂移”——即测试结果无法反映真实代码行为。这不仅是流程…

作者头像 李华
网站建设 2026/4/18 21:40:42

TestOps实战:如何让测试成为“质量左移”的核心

质量左移的紧迫性与TestOps的崛起 在快速迭代的软件开发时代&#xff0c;“质量左移”&#xff08;Shift Left&#xff09;已成为行业共识——它强调将测试活动从传统的事后环节前置到需求分析、设计和编码阶段&#xff0c;从而提前暴露缺陷、降低修复成本。然而&#xff0c;许…

作者头像 李华
网站建设 2026/4/20 5:22:21

乐迪信息:防爆AI摄像机内置算法:集成船舶类型识别与烟火检测功能

这种新型的防爆摄像机不仅具备高效的防爆特性&#xff0c;更是通过内置先进的算法&#xff0c;实现了船舶类型的智能识别与烟火的实时检测。本文将全面探讨防爆AI摄像机的优势、工作原理、应用场景及未来发展前景。一&#xff1a;防爆AI摄像机概述防爆AI摄像机是专为高危环境设…

作者头像 李华