news 2026/6/26 17:02:20

Python 多线程编程完全指南

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python 多线程编程完全指南

一、为什么需要多线程?

在 Python 中,多线程是一种并发编程技术,允许程序同时执行多个任务。它特别适合以下场景:

  • I/O 密集型任务:网络请求、文件读写、数据库操作等,线程在等待 I/O 时让出 CPU,提高效率。

  • 用户界面响应:后台执行耗时操作时保持界面不卡顿。

  • 并发服务处理:Web 服务器同时处理多个客户端请求。

注意:由于 Python 的 GIL(全局解释器锁)限制,多线程不适合CPU 密集型任务(如大量数学计算),这种情况应使用多进程。


二、Python 多线程核心模块

Python 提供两个主要模块处理多线程:

模块特点推荐场景
_thread低级别、功能简单不推荐,仅作了解
threading高级别、功能丰富实际开发首选

三、基础用法:创建线程

3.1 方法一:直接创建 Thread 对象

python

import threading import time def worker(name, delay): """线程执行的函数""" for i in range(3): print(f"线程 {name} 执行第 {i+1} 次,时间: {time.strftime('%H:%M:%S')}") time.sleep(delay) # 创建线程 t1 = threading.Thread(target=worker, args=("A", 1)) t2 = threading.Thread(target=worker, args=("B", 1.5)) # 启动线程 t1.start() t2.start() # 等待线程结束 t1.join() t2.join() print("所有线程执行完毕")

3.2 方法二:继承 Thread 类

python

import threading import time class MyThread(threading.Thread): def __init__(self, name, delay): super().__init__() self.name = name self.delay = delay def run(self): """重写 run 方法,线程入口""" for i in range(3): print(f"线程 {self.name} 计数: {i+1}") time.sleep(self.delay) # 使用自定义线程类 t1 = MyThread("Worker-1", 1) t2 = MyThread("Worker-2", 1.5) t1.start() t2.start() t1.join() t2.join()

3.3 设置线程为守护线程(Daemon)

守护线程在主线程结束时自动终止,适用于后台监控任务。

python

import threading import time def daemon_task(): while True: print("守护线程运行中...") time.sleep(2) daemon = threading.Thread(target=daemon_task, daemon=True) daemon.start() # 主线程 5 秒后结束,守护线程自动停止 time.sleep(5) print("主线程结束,守护线程随之停止")

四、线程同步与锁

多线程共享全局变量可能引发数据竞争,需要使用同步机制。

4.1 Lock(互斥锁)

python

import threading # 共享资源 counter = 0 lock = threading.Lock() def increment(): global counter for _ in range(100000): with lock: # 推荐使用上下文管理器 counter += 1 # 或者使用手动加锁/解锁 def decrement(): global counter for _ in range(100000): lock.acquire() counter -= 1 lock.release() threads = [] for _ in range(5): t = threading.Thread(target=increment) t.start() threads.append(t) for t in threads: t.join() print(f"最终结果(应为 500000):{counter}")

4.2 RLock(可重入锁)

同一线程可多次获取,适合递归或嵌套调用场景。

python

import threading rlock = threading.RLock() def recursive_function(n): if n <= 0: return with rlock: print(f"递归层级 {n},线程 {threading.current_thread().name}") recursive_function(n - 1) t = threading.Thread(target=recursive_function, args=(5,)) t.start() t.join()

4.3 死锁示例与避免

python

# 死锁示例(错误代码) lock_a = threading.Lock() lock_b = threading.Lock() def task1(): with lock_a: time.sleep(0.1) # 模拟操作 with lock_b: print("task1 完成") def task2(): with lock_b: # 注意顺序与 task1 相反 time.sleep(0.1) with lock_a: print("task2 完成") # 使用超时避免死锁 def safe_task1(): if lock_a.acquire(timeout=1): if lock_b.acquire(timeout=1): print("safe_task1 完成") lock_b.release() lock_a.release()

4.4 Semaphore(信号量)

控制同时访问资源的线程数量。

python

import threading import time # 最多允许 3 个线程同时执行 semaphore = threading.Semaphore(3) def limited_task(name): with semaphore: print(f"线程 {name} 开始执行,时间: {time.strftime('%H:%M:%S')}") time.sleep(2) print(f"线程 {name} 执行完毕") threads = [threading.Thread(target=limited_task, args=(i,)) for i in range(10)] for t in threads: t.start()

4.5 Event(事件通知)

线程间发送信号,常用于等待某个条件达成。

python

import threading import time event = threading.Event() def waiter(): print("等待事件触发...") event.wait() # 阻塞等待 print("事件已触发,继续执行") def setter(): time.sleep(3) print("3秒后触发事件") event.set() # 触发事件 t1 = threading.Thread(target=waiter) t2 = threading.Thread(target=setter) t1.start() t2.start()

五、线程间通信:Queue 队列

queue.Queue是线程安全的队列,无需手动加锁。

python

import threading import queue import time # 创建队列 q = queue.Queue(maxsize=5) def producer(): for i in range(10): item = f"数据-{i}" q.put(item) # 队列满时阻塞 print(f"生产: {item}") time.sleep(0.5) def consumer(): while True: item = q.get() # 队列空时阻塞 print(f"消费: {item}") q.task_done() # 标记任务完成 time.sleep(1) # 创建生产者线程和消费者线程 producer_thread = threading.Thread(target=producer) consumer_thread = threading.Thread(target=consumer, daemon=True) producer_thread.start() consumer_thread.start() # 等待所有队列任务完成 producer_thread.join() q.join() # 等待所有 task_done 被调用 print("所有任务处理完毕")

队列类型选择

队列类特点
queue.QueueFIFO 先进先出
queue.LifoQueueLIFO 后进先出(栈)
queue.PriorityQueue优先级队列

六、线程池:concurrent.futures

Python 3.2+ 推荐使用ThreadPoolExecutor管理线程池。

6.1 基础用法

python

from concurrent.futures import ThreadPoolExecutor, as_completed import time def task(n): time.sleep(1) return n * n # 创建包含 3 个线程的线程池 with ThreadPoolExecutor(max_workers=3) as executor: # 方法1:提交单个任务 future = executor.submit(task, 5) result = future.result() print(f"单个结果: {result}") # 方法2:批量提交 numbers = [1, 2, 3, 4, 5] futures = [executor.submit(task, n) for n in numbers] # 按完成顺序获取结果 for future in as_completed(futures): print(f"完成结果: {future.result()}")

6.2 map 方法批量处理

python

from concurrent.futures import ThreadPoolExecutor def square(n): return n * n with ThreadPoolExecutor(max_workers=4) as executor: # map 保持输入顺序 results = executor.map(square, [1, 2, 3, 4, 5]) for result in results: print(result) # 支持多个参数 def add(a, b): return a + b with ThreadPoolExecutor() as executor: results = executor.map(add, [1, 2, 3], [10, 20, 30]) print(list(results)) # [11, 22, 33]

6.3 设置超时

python

with ThreadPoolExecutor() as executor: future = executor.submit(time.sleep, 10) try: result = future.result(timeout=3) except TimeoutError: print("任务执行超时") future.cancel() # 尝试取消(可能无效)

七、全局解释器锁(GIL)说明

GIL 是 CPython 解释器的特性,确保同一时刻只有一个线程执行 Python 字节码。

python

import threading import time # CPU 密集型任务示例(多线程不会加速) def cpu_intensive(): total = 0 for i in range(50_000_000): total += i # 多线程 vs 单线程 性能对比 start = time.time() cpu_intensive() cpu_intensive() print(f"单线程耗时: {time.time() - start:.2f} 秒") start = time.time() t1 = threading.Thread(target=cpu_intensive) t2 = threading.Thread(target=cpu_intensive) t1.start() t2.start() t1.join() t2.join() print(f"多线程耗时: {time.time() - start:.2f} 秒") # 实际多线程可能比单线程更慢(由于 GIL 竞争)

解决方案

  • I/O 密集型 → 多线程

  • CPU 密集型 →multiprocessing模块


八、实战示例:批量下载图片

python

import threading import requests import time from concurrent.futures import ThreadPoolExecutor # 模拟图片 URL 列表 urls = [f"https://picsum.photos/200/300?random={i}" for i in range(20)] def download_image(url, save_path): """下载单张图片""" try: response = requests.get(url, timeout=10) if response.status_code == 200: with open(save_path, 'wb') as f: f.write(response.content) print(f"下载成功: {save_path}") return True except Exception as e: print(f"下载失败 {url}: {e}") return False def batch_download(max_workers=5): start = time.time() with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [] for i, url in enumerate(urls): future = executor.submit(download_image, url, f"images/img_{i}.jpg") futures.append(future) # 等待所有任务完成 success_count = sum(1 for f in futures if f.result()) elapsed = time.time() - start print(f"下载完成,成功: {success_count}/{len(urls)},耗时: {elapsed:.2f} 秒") if __name__ == "__main__": batch_download()

九、常见问题与最佳实践

9.1 常见陷阱

问题原因解决方案
数据竞争多个线程同时修改共享变量使用 Lock 或 Queue
死锁线程互相等待对方释放资源统一加锁顺序,使用 timeout
线程泄漏线程未正确 join 或设置为 daemon使用线程池自动管理
虚假唤醒wait() 被意外唤醒使用 while 循环检查条件

9.2 最佳实践清单

python

# ✅ 推荐做法 # 1. 使用 with 语句管理锁 with lock: shared_data += 1 # 2. 优先使用 Queue 而非共享变量 from queue import Queue q = Queue() q.put(data) data = q.get() # 3. 使用线程池代替手动管理线程 with ThreadPoolExecutor(max_workers=10) as executor: executor.map(func, iterable) # 4. 设置守护线程避免程序无法退出 thread = threading.Thread(target=daemon_task, daemon=True) # ❌ 避免做法 # 1. 不要手动调用 _thread 模块 # 2. 不要在循环中频繁创建线程(使用线程池) # 3. 不要在持有锁时执行耗时操作 # 4. 不要以为多线程一定比单线程快(考虑 GIL)

十、多线程 vs 多进程 vs 异步 IO

方案适用场景优点缺点
threadingI/O 密集型轻量,共享内存GIL 限制 CPU 任务
multiprocessingCPU 密集型真正并行内存开销大,通信复杂
asyncio高并发 I/O极高效率,单线程学习曲线陡峭

选择指南

  • Web 爬虫、API 调用 →threadingasyncio

  • 图像处理、科学计算 →multiprocessing

  • 混合任务 → 组合使用


总结

Python 多线程是解决 I/O 密集型并发问题的有力工具。掌握以下要点即可应对大部分场景:

  1. 基础threading.Thread创建线程,start()启动,join()等待

  2. 同步:使用LockRLockSemaphoreEvent协调线程

  3. 通信:优先使用queue.Queue替代共享变量

  4. 线程池ThreadPoolExecutor管理线程生命周期

  5. 认清 GIL:CPU 密集型任务换用多进程

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/23 19:45:09

slam_toolbox-ros-melodic实战:从部署到调优的完整避坑指南

1. 环境准备与安装指南 在ROS Melodic上部署slam_toolbox前&#xff0c;需要确保系统环境满足以下条件。我曾在TurtleBot3和自定义移动机器人上反复验证过这套配置流程&#xff0c;总结出几个关键要点&#xff1a; 系统基础环境&#xff1a; Ubuntu 18.04 LTS&#xff08;ROS M…

作者头像 李华
网站建设 2026/6/23 19:46:04

【Perplexity开源项目搜索实战指南】:20年资深工程师亲授3大检索黑科技,90%开发者还不知道的隐藏技巧

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;Perplexity开源项目搜索实战指南导论 Perplexity 是一个基于语义理解与实时网络检索能力构建的开源搜索增强框架&#xff0c;其核心价值在于将大语言模型的推理能力与精准、可追溯的代码级资源发现深度…

作者头像 李华
网站建设 2026/6/23 19:34:12

红米K70 Pro澎湃OS深度解锁:Delta面具Root实战指南与BL解锁设备适配

1. 红米K70 Pro Root前的准备工作 给手机获取Root权限就像给房子装修前要准备工具一样&#xff0c;需要做好万全准备。我去年给三台不同型号的小米手机Root过&#xff0c;红米K70 Pro的澎湃OS系统确实有些特殊注意事项。首先确认你的手机已经解锁BootLoader&#xff08;俗称BL锁…

作者头像 李华
网站建设 2026/6/23 19:34:13

Windows系统上突破性的安卓应用运行方案:APK安装器的技术革命

Windows系统上突破性的安卓应用运行方案&#xff1a;APK安装器的技术革命 【免费下载链接】APK-Installer An Android Application Installer for Windows 项目地址: https://gitcode.com/GitHub_Trending/ap/APK-Installer 你是否厌倦了在手机小屏幕上操作安卓应用&…

作者头像 李华
网站建设 2026/6/23 19:34:08

「1图1表格」从Mallory到Masson染色:洞察肌纤维免疫组化三颜色的奥秘

如果你着急去染色&#xff0c;看完图片保存好&#xff0c;就撤吧。如果你认为内容有帮助&#xff0c;点个赞或推荐再走吧。 接下来我们将会开启“废话”模式&#xff0c;在图片内容的基础上&#xff0c;唠一唠Masson三色染色法的历史、染色原理、核心操作步骤的关键机制。 一、…

作者头像 李华
网站建设 2026/6/23 19:34:11

ARM966E-S r2处理器勘误解析与解决方案

1. ARM966E-S r2处理器勘误全景解析在嵌入式系统开发领域&#xff0c;处理器核心的稳定性直接决定整个系统的可靠性。作为ARM9E系列中的经典产品&#xff0c;ARM966E-S r2处理器广泛应用于工业控制、汽车电子等对实时性要求严格的场景。但在实际工程应用中&#xff0c;硬件层面…

作者头像 李华