一、为什么需要多线程?
在 Python 中,多线程是一种并发编程技术,允许程序同时执行多个任务。它特别适合以下场景:
I/O 密集型任务:网络请求、文件读写、数据库操作等,线程在等待 I/O 时让出 CPU,提高效率。
用户界面响应:后台执行耗时操作时保持界面不卡顿。
并发服务处理:Web 服务器同时处理多个客户端请求。
注意:由于 Python 的 GIL(全局解释器锁)限制,多线程不适合CPU 密集型任务(如大量数学计算),这种情况应使用多进程。
二、Python 多线程核心模块
Python 提供两个主要模块处理多线程:
| 模块 | 特点 | 推荐场景 |
|---|---|---|
_thread | 低级别、功能简单 | 不推荐,仅作了解 |
threading | 高级别、功能丰富 | 实际开发首选 |
三、基础用法:创建线程
3.1 方法一:直接创建 Thread 对象
python
import threading import time def worker(name, delay): """线程执行的函数""" for i in range(3): print(f"线程 {name} 执行第 {i+1} 次,时间: {time.strftime('%H:%M:%S')}") time.sleep(delay) # 创建线程 t1 = threading.Thread(target=worker, args=("A", 1)) t2 = threading.Thread(target=worker, args=("B", 1.5)) # 启动线程 t1.start() t2.start() # 等待线程结束 t1.join() t2.join() print("所有线程执行完毕")3.2 方法二:继承 Thread 类
python
import threading import time class MyThread(threading.Thread): def __init__(self, name, delay): super().__init__() self.name = name self.delay = delay def run(self): """重写 run 方法,线程入口""" for i in range(3): print(f"线程 {self.name} 计数: {i+1}") time.sleep(self.delay) # 使用自定义线程类 t1 = MyThread("Worker-1", 1) t2 = MyThread("Worker-2", 1.5) t1.start() t2.start() t1.join() t2.join()3.3 设置线程为守护线程(Daemon)
守护线程在主线程结束时自动终止,适用于后台监控任务。
python
import threading import time def daemon_task(): while True: print("守护线程运行中...") time.sleep(2) daemon = threading.Thread(target=daemon_task, daemon=True) daemon.start() # 主线程 5 秒后结束,守护线程自动停止 time.sleep(5) print("主线程结束,守护线程随之停止")四、线程同步与锁
多线程共享全局变量可能引发数据竞争,需要使用同步机制。
4.1 Lock(互斥锁)
python
import threading # 共享资源 counter = 0 lock = threading.Lock() def increment(): global counter for _ in range(100000): with lock: # 推荐使用上下文管理器 counter += 1 # 或者使用手动加锁/解锁 def decrement(): global counter for _ in range(100000): lock.acquire() counter -= 1 lock.release() threads = [] for _ in range(5): t = threading.Thread(target=increment) t.start() threads.append(t) for t in threads: t.join() print(f"最终结果(应为 500000):{counter}")4.2 RLock(可重入锁)
同一线程可多次获取,适合递归或嵌套调用场景。
python
import threading rlock = threading.RLock() def recursive_function(n): if n <= 0: return with rlock: print(f"递归层级 {n},线程 {threading.current_thread().name}") recursive_function(n - 1) t = threading.Thread(target=recursive_function, args=(5,)) t.start() t.join()4.3 死锁示例与避免
python
# 死锁示例(错误代码) lock_a = threading.Lock() lock_b = threading.Lock() def task1(): with lock_a: time.sleep(0.1) # 模拟操作 with lock_b: print("task1 完成") def task2(): with lock_b: # 注意顺序与 task1 相反 time.sleep(0.1) with lock_a: print("task2 完成") # 使用超时避免死锁 def safe_task1(): if lock_a.acquire(timeout=1): if lock_b.acquire(timeout=1): print("safe_task1 完成") lock_b.release() lock_a.release()4.4 Semaphore(信号量)
控制同时访问资源的线程数量。
python
import threading import time # 最多允许 3 个线程同时执行 semaphore = threading.Semaphore(3) def limited_task(name): with semaphore: print(f"线程 {name} 开始执行,时间: {time.strftime('%H:%M:%S')}") time.sleep(2) print(f"线程 {name} 执行完毕") threads = [threading.Thread(target=limited_task, args=(i,)) for i in range(10)] for t in threads: t.start()4.5 Event(事件通知)
线程间发送信号,常用于等待某个条件达成。
python
import threading import time event = threading.Event() def waiter(): print("等待事件触发...") event.wait() # 阻塞等待 print("事件已触发,继续执行") def setter(): time.sleep(3) print("3秒后触发事件") event.set() # 触发事件 t1 = threading.Thread(target=waiter) t2 = threading.Thread(target=setter) t1.start() t2.start()五、线程间通信:Queue 队列
queue.Queue是线程安全的队列,无需手动加锁。
python
import threading import queue import time # 创建队列 q = queue.Queue(maxsize=5) def producer(): for i in range(10): item = f"数据-{i}" q.put(item) # 队列满时阻塞 print(f"生产: {item}") time.sleep(0.5) def consumer(): while True: item = q.get() # 队列空时阻塞 print(f"消费: {item}") q.task_done() # 标记任务完成 time.sleep(1) # 创建生产者线程和消费者线程 producer_thread = threading.Thread(target=producer) consumer_thread = threading.Thread(target=consumer, daemon=True) producer_thread.start() consumer_thread.start() # 等待所有队列任务完成 producer_thread.join() q.join() # 等待所有 task_done 被调用 print("所有任务处理完毕")队列类型选择:
| 队列类 | 特点 |
|---|---|
queue.Queue | FIFO 先进先出 |
queue.LifoQueue | LIFO 后进先出(栈) |
queue.PriorityQueue | 优先级队列 |
六、线程池:concurrent.futures
Python 3.2+ 推荐使用ThreadPoolExecutor管理线程池。
6.1 基础用法
python
from concurrent.futures import ThreadPoolExecutor, as_completed import time def task(n): time.sleep(1) return n * n # 创建包含 3 个线程的线程池 with ThreadPoolExecutor(max_workers=3) as executor: # 方法1:提交单个任务 future = executor.submit(task, 5) result = future.result() print(f"单个结果: {result}") # 方法2:批量提交 numbers = [1, 2, 3, 4, 5] futures = [executor.submit(task, n) for n in numbers] # 按完成顺序获取结果 for future in as_completed(futures): print(f"完成结果: {future.result()}")6.2 map 方法批量处理
python
from concurrent.futures import ThreadPoolExecutor def square(n): return n * n with ThreadPoolExecutor(max_workers=4) as executor: # map 保持输入顺序 results = executor.map(square, [1, 2, 3, 4, 5]) for result in results: print(result) # 支持多个参数 def add(a, b): return a + b with ThreadPoolExecutor() as executor: results = executor.map(add, [1, 2, 3], [10, 20, 30]) print(list(results)) # [11, 22, 33]
6.3 设置超时
python
with ThreadPoolExecutor() as executor: future = executor.submit(time.sleep, 10) try: result = future.result(timeout=3) except TimeoutError: print("任务执行超时") future.cancel() # 尝试取消(可能无效)七、全局解释器锁(GIL)说明
GIL 是 CPython 解释器的特性,确保同一时刻只有一个线程执行 Python 字节码。
python
import threading import time # CPU 密集型任务示例(多线程不会加速) def cpu_intensive(): total = 0 for i in range(50_000_000): total += i # 多线程 vs 单线程 性能对比 start = time.time() cpu_intensive() cpu_intensive() print(f"单线程耗时: {time.time() - start:.2f} 秒") start = time.time() t1 = threading.Thread(target=cpu_intensive) t2 = threading.Thread(target=cpu_intensive) t1.start() t2.start() t1.join() t2.join() print(f"多线程耗时: {time.time() - start:.2f} 秒") # 实际多线程可能比单线程更慢(由于 GIL 竞争)解决方案:
I/O 密集型 → 多线程
CPU 密集型 →
multiprocessing模块
八、实战示例:批量下载图片
python
import threading import requests import time from concurrent.futures import ThreadPoolExecutor # 模拟图片 URL 列表 urls = [f"https://picsum.photos/200/300?random={i}" for i in range(20)] def download_image(url, save_path): """下载单张图片""" try: response = requests.get(url, timeout=10) if response.status_code == 200: with open(save_path, 'wb') as f: f.write(response.content) print(f"下载成功: {save_path}") return True except Exception as e: print(f"下载失败 {url}: {e}") return False def batch_download(max_workers=5): start = time.time() with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [] for i, url in enumerate(urls): future = executor.submit(download_image, url, f"images/img_{i}.jpg") futures.append(future) # 等待所有任务完成 success_count = sum(1 for f in futures if f.result()) elapsed = time.time() - start print(f"下载完成,成功: {success_count}/{len(urls)},耗时: {elapsed:.2f} 秒") if __name__ == "__main__": batch_download()九、常见问题与最佳实践
9.1 常见陷阱
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 数据竞争 | 多个线程同时修改共享变量 | 使用 Lock 或 Queue |
| 死锁 | 线程互相等待对方释放资源 | 统一加锁顺序,使用 timeout |
| 线程泄漏 | 线程未正确 join 或设置为 daemon | 使用线程池自动管理 |
| 虚假唤醒 | wait() 被意外唤醒 | 使用 while 循环检查条件 |
9.2 最佳实践清单
python
# ✅ 推荐做法 # 1. 使用 with 语句管理锁 with lock: shared_data += 1 # 2. 优先使用 Queue 而非共享变量 from queue import Queue q = Queue() q.put(data) data = q.get() # 3. 使用线程池代替手动管理线程 with ThreadPoolExecutor(max_workers=10) as executor: executor.map(func, iterable) # 4. 设置守护线程避免程序无法退出 thread = threading.Thread(target=daemon_task, daemon=True) # ❌ 避免做法 # 1. 不要手动调用 _thread 模块 # 2. 不要在循环中频繁创建线程(使用线程池) # 3. 不要在持有锁时执行耗时操作 # 4. 不要以为多线程一定比单线程快(考虑 GIL)
十、多线程 vs 多进程 vs 异步 IO
| 方案 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
threading | I/O 密集型 | 轻量,共享内存 | GIL 限制 CPU 任务 |
multiprocessing | CPU 密集型 | 真正并行 | 内存开销大,通信复杂 |
asyncio | 高并发 I/O | 极高效率,单线程 | 学习曲线陡峭 |
选择指南:
Web 爬虫、API 调用 →
threading或asyncio图像处理、科学计算 →
multiprocessing混合任务 → 组合使用
总结
Python 多线程是解决 I/O 密集型并发问题的有力工具。掌握以下要点即可应对大部分场景:
基础:
threading.Thread创建线程,start()启动,join()等待同步:使用
Lock、RLock、Semaphore、Event协调线程通信:优先使用
queue.Queue替代共享变量线程池:
ThreadPoolExecutor管理线程生命周期认清 GIL:CPU 密集型任务换用多进程