第三部分:多线程
1. 线程基础理论
1.1 线程的定义与组成
1.1.1 线程的定义
- 狭义定义:线程是进程中的一个执行单元,是 CPU 调度和分派的基本单位。
- 广义定义:线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。一个进程中可以并发多个线程,每条线程并行执行不同的任务。
1.1.2 线程与进程的区别
- 进程是资源分配的基本单位,拥有独立的地址空间。
- 线程是 CPU 调度的基本单位,共享进程的地址空间。
一个进程可以包含多个线程,这些线程共享进程的资源(内存、文件描述符等),但每个线程有自己独立的执行栈和程序计数器。
1.1.3 线程的组成
一个线程主要由以下部分组成:
- 线程 ID(TID):线程的唯一标识符,在进程内唯一。
- 程序计数器(PC):指向线程当前执行的指令地址。
- 寄存器集合:保存线程的工作变量和中间结果。
- 栈(Stack):存储线程的局部变量、函数调用信息和返回地址,每个线程有独立的栈。
- 线程控制块(TCB):类似于进程的 PCB,记录线程的状态信息、优先级等。
线程共享的进程资源:
- 代码段(Text Segment)
- 数据段(Data Segment)
- BSS 段
- 堆区(Heap)
- 文件描述符表
- 信号处理器
- 进程 ID 和进程组 ID
1.2 线程控制块与内存模型
1.2.1 线程控制块(TCB)
线程控制块(Thread Control Block,TCB)是操作系统用于管理线程的数据结构。每个线程都有一个 TCB,它包含:
- 线程标识信息:
- 线程 ID(TID)
- 所属进程 ID(PID)
- 线程组 ID
- 处理器状态信息:
- 程序计数器(PC)
- CPU 寄存器内容
- 栈指针(SP)
- 线程调度信息:
- 线程状态(新建、就绪、运行、阻塞、终止)
- 线程优先级
- 调度策略参数
- 栈信息:
- 用户栈指针
- 内核栈指针
- 栈大小限制
1.2.2 线程内存布局
线程在进程内存空间中的布局特点:
线程独有的内存区域:
- 线程栈:每个线程有自己的栈空间,存储局部变量和函数调用信息
- 线程局部存储(TLS):线程私有的全局变量存储区域
- 寄存器状态:每个线程独立的 CPU 寄存器值
线程共享的内存区域:
- 代码段:所有线程执行相同的程序代码
- 数据段和 BSS 段:全局变量和静态变量
- 堆区:动态分配的内存
- 内存映射区:共享库、文件映射等
1.2.3 线程上下文切换
线程上下文切换比进程上下文切换更轻量,主要步骤:
- 保存当前线程的 CPU 寄存器状态
- 更新当前线程的 TCB 信息
- 选择下一个要执行的线程
- 恢复被选中线程的 CPU 寄存器状态
- 切换栈指针到新线程的栈
- 跳转到新线程的执行点
由于线程共享地址空间,不需要切换页表和刷新 TLB,所以比进程切换更快。
1.3 线程标识符与层次关系
1.3.1 线程标识符(TID)
每个线程都有一个唯一的线程 ID(TID),在进程内唯一标识该线程。与进程的 PID 类似,TID 用于:
- 线程调度和管理
- 线程间通信和同步
- 调试和监控
在 Linux 系统中,线程实际上是轻量级进程(LWP),每个线程都有系统级的 TID。
1.3.2 主线程与工作线程
主线程(Main Thread):
- 进程创建时自动创建的第一个线程
- 负责进程的初始化和其他线程的创建
- 主线程结束通常意味着整个进程结束
- 拥有进程的主要执行流
工作线程(Worker Thread):
- 由主线程或其他线程创建的辅助线程
- 执行特定的任务或功能
- 可以是守护线程或非守护线程
- 生命周期可以独立于主线程(非守护线程)或依赖于主线程(守护线程)
1.4 线程状态与调度
1.4.1 线程状态
线程的生命周期包含以下几种状态:
五种基本状态:
- 新建状态(New):线程对象被创建但尚未启动
- 就绪状态(Runnable):线程可以运行,正在等待 CPU 时间片
- 运行状态(Running):线程获得 CPU,正在执行
- 阻塞状态(Blocked):线程因等待某个条件而暂停执行
- 终止状态(Terminated):线程执行完毕或被强制终止
1.4.2 线程调度
线程调度通常由操作系统的线程调度器负责,主要策略与进程调度类似:
用户级线程 vs 内核级线程:
- 用户级线程:
- 由用户空间的线程库管理
- 切换快速,不需要内核参与
- 一个线程阻塞会导致整个进程阻塞
- Python 的早期线程实现
- 内核级线程:
- 由操作系统内核直接管理
- 切换需要系统调用,开销较大
- 真正的并发执行(在多核系统上)
- 现代 Python 线程的实现方式
Python 线程调度特点:
- 使用操作系统原生线程(1:1 模型)
- 受 GIL(全局解释器锁)限制(除 Python 3.13+自由线程模式)
- 在 I/O 操作时会释放 GIL,允许其他线程运行
- CPU 密集型任务无法真正并行(在带 GIL 的 Python 版本中)
1.5 线程 vs 进程对比总结
| 特性 | 进程 | 线程 |
|---|---|---|
| 定义 | 资源分配的基本单位 | CPU 调度的基本单位 |
| 地址空间 | 独立的地址空间 | 共享进程的地址空间 |
| 资源开销 | 创建销毁开销大 | 创建销毁开销小 |
| 内存占用 | 独立内存,占用较大 | 共享内存,占用较小 |
| 通信方式 | IPC(管道、队列、共享内存等) | 直接读写共享变量 |
| 上下文切换 | 慢(需切换地址空间) | 快(同一地址空间) |
| 并发性 | 真正的并行(多核) | Python 中受 GIL 限制(传统版本) |
| 稳定性 | 一个进程崩溃不影响其他 | 一个线程崩溃可能影响整个进程 |
| 数据共享 | 显式通信,安全性高 | 隐式共享,需要同步 |
| 适用场景 | CPU 密集型、需要隔离 | I/O 密集型、需要共享数据 |
2. 特殊线程类型
2.1 守护线程深入解析
守护线程(Daemon Thread)是一种在后台运行的特殊线程,主要用于执行辅助性任务。
2.1.1 守护线程的特点
- 生命周期依赖:守护线程的生命周期依赖于非守护线程,当所有非守护线程结束时,守护线程会被强制终止
- 后台运行:通常用于执行后台任务,如垃圾回收、日志记录、监控等
- 自动清理:程序退出时不需要等待守护线程完成
- 不保证完成:由于会被强制终止,守护线程的任务可能无法完成
2.1.2 守护线程与普通线程对比
| 特性 | 普通线程 | 守护线程 |
|---|---|---|
| 生命周期 | 独立运行直到完成 | 随非守护线程结束而终止 |
| 程序退出 | 程序会等待其完成 | 程序不等待,直接终止 |
| 任务保证 | 保证任务完成 | 不保证任务完成 |
| 典型用途 | 主要业务逻辑 | 辅助任务、监控、日志 |
| 设置方式 | 默认daemon=False | thread.daemon=True |
2.2 主线程与工作线程
2.2.1 主线程特性
- 自动创建:程序启动时自动创建的第一个线程
- 程序入口:执行程序的主要逻辑
- 管理职责:负责创建和管理其他线程
- 退出影响:主线程结束可能导致整个程序退出(取决于是否有非守护线程)
2.2.2 工作线程特性
- 按需创建:根据任务需要创建
- 专门任务:通常负责特定的功能模块
- 并发执行:多个工作线程可以并发执行
- 资源共享:与主线程共享进程资源
3. 线程创建与管理
3.1 threading.Thread 的工作原理
threading.Thread 是 Python 中创建和管理线程的主要类,它封装了底层的线程操作。
3.1.1 Thread 类工作机制
当调用 Thread.start() 时的执行流程:
- 创建新的系统线程
- 在新线程中调用
run()方法 run()方法执行target指定的函数- 函数执行完毕,线程自动终止
3.1.2 基础示例
import threading
import time
def worker_task(name, duration):
"""工作线程任务"""
print(f"线程 {name} 开始")
for i in range(duration):
time.sleep(1)
print(f"线程 {name} 工作中... {i+1}/{duration}")
print(f"线程 {name} 完成")
if __name__ == '__main__':
print(f"主线程开始")
# 创建线程
thread = threading.Thread(
target=worker_task,
args=("Worker-1", 3)
)
print("启动工作线程...")
thread.start()
print("主线程继续执行")
print("等待工作线程完成...")
thread.join()
print("所有任务完成")3.1.3 Thread 类的方法和属性
常用方法:
| 方法名 | 描述 |
|---|---|
t.start() | 启动线程,调用 run()方法 |
t.run() | 线程启动时执行的方法 |
t.join([timeout]) | 等待线程结束,可指定超时时间 |
t.is_alive() | 检查线程是否还在运行 |
常用属性:
| 属性名 | 描述 |
|---|---|
t.name | 线程名称 |
t.ident | 线程标识符(启动后才有) |
t.daemon | 是否为守护线程 |
t.native_id | 系统原生线程 ID(Python 3.8+) |
3.2 自定义 Thread 类
通过继承 threading.Thread 类可以创建自定义线程类,便于封装复杂的线程逻辑。
3.2.1 继承 Thread 类
import threading
import time
class CountThread(threading.Thread):
"""计数线程类示例"""
def __init__(self, name, count):
super().__init__(name=name)
self.count = count
def run(self):
"""重写run方法"""
for i in range(self.count):
print(f"{self.name}: {i}")
time.sleep(0.5)
print(f"{self.name} 完成计数")
if __name__ == '__main__':
# 创建并启动线程
t1 = CountThread("Thread-1", 3)
t2 = CountThread("Thread-2", 3)
t1.start()
t2.start()
t1.join()
t2.join()
print("所有线程完成")3.3 线程池 ThreadPoolExecutor
concurrent.futures.ThreadPoolExecutor 提供了线程池功能,用于管理和复用线程。
3.3.1 线程池的优势
- 资源复用:避免频繁创建销毁线程的开销
- 并发控制:限制最大线程数,防止资源耗尽
- 任务队列:自动管理任务分配
- 简化编程:提供高级接口,简化并发编程
3.3.2 ThreadPoolExecutor 使用
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
"""模拟任务"""
time.sleep(0.5)
return n * n
if __name__ == '__main__':
# 使用线程池执行任务
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交多个任务
futures = [executor.submit(task, i) for i in range(5)]
# 获取结果
results = [future.result() for future in futures]
print(f"结果: {results}")4. 线程间通信与同步
4.1 线程数据共享
由于线程共享进程的内存空间,线程间可以直接访问共享变量,但这也带来了数据安全问题。
- 多线程一般考虑的是线程同步!
4.1.1 数据共享示例
import threading
import time
# 共享变量
counter = 0
def unsafe_increment(name, n):
global counter
for _ in range(n):
temp = counter
temp += 1
counter = temp # 非原子操作,可能产生竞态条件
time.sleep(0.0001) # 增加竞态条件的出现概率
if __name__ == '__main__':
threads = []
for i in range(2):
t = threading.Thread(target=unsafe_increment, args=(f"Thread-{i}", 1000))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终计数 (期望2000): {counter}") # 结果可能不是20004.2 线程安全队列 Queue
queue.Queue 提供了线程安全的队列实现,适合生产者-消费者模式。
4.2.1 Queue 的特点
- 线程安全:内部使用锁保证操作的原子性
- 阻塞操作:支持阻塞的 put 和 get 操作
- 多种类型:Queue(FIFO)、LifoQueue(LIFO)、PriorityQueue(优先级)
4.2.2 Queue 使用示例
import queue
import threading
import time
def producer(q):
"""生产者:向队列添加数据"""
for i in range(3):
item = f"item-{i}"
q.put(item)
print(f"生产: {item}")
time.sleep(0.1)
def consumer(q):
"""消费者:从队列取出数据"""
while True:
try:
item = q.get(timeout=0.5)
print(f"消费: {item}")
q.task_done()
except queue.Empty:
break
if __name__ == '__main__':
q = queue.Queue()
# 创建并启动线程
producer_thread = threading.Thread(target=producer, args=(q,))
consumer_thread = threading.Thread(target=consumer, args=(q,))
producer_thread.start()
consumer_thread.start()
producer_thread.join()
q.join() # 等待队列处理完成4.3 线程同步原语
4.3.1 Lock 互斥锁
Lock 是最基本的同步原语,用于保护临界区。
import threading
counter = 0
lock = threading.Lock()
def safe_increment(name, n):
global counter
for _ in range(n):
with lock: # 获取锁,确保原子操作
counter += 1
if __name__ == '__main__':
threads = []
for _ in range(5):
t = threading.Thread(target=safe_increment, args=(f"Thread-{_}", 1000))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终计数: {counter}") # 应该是50004.3.2 RLock 递归锁
RLock 允许同一线程多次获取同一把锁。
import threading
import time
rlock = threading.RLock()
def recursive_function(name, depth):
with rlock:
print(f"{name}: 深度 {depth}")
if depth > 0:
recursive_function(name, depth - 1)
if __name__ == '__main__':
t = threading.Thread(target=recursive_function, args=("Thread-1", 3))
t.start()
t.join()4.3.3 Semaphore 信号量
控制同时访问资源的线程数量。
import threading
import time
# 最多允许2个线程同时访问
semaphore = threading.Semaphore(2)
def access_resource(name):
with semaphore:
print(f"{name} 获得资源访问权")
time.sleep(1)
print(f"{name} 释放资源")
if __name__ == '__main__':
threads = []
for i in range(4):
t = threading.Thread(target=access_resource, args=(f"Thread-{i}",))
threads.append(t)
t.start()
for t in threads:
t.join()4.3.4 Event 事件
用于线程间的简单信号通信。
import threading
import time
event = threading.Event()
def waiter(name):
print(f"{name} 等待事件...")
event.wait()
print(f"{name} 收到事件,开始工作")
def setter():
print("准备触发事件...")
time.sleep(2)
print("触发事件!")
event.set()
if __name__ == '__main__':
# 创建等待线程
waiters = [threading.Thread(target=waiter, args=(f"Waiter-{i}",)) for i in range(2)]
trigger = threading.Thread(target=setter)
for w in waiters:
w.start()
trigger.start()
for w in waiters:
w.join()
trigger.join()4.3.5 Condition 条件变量
提供更复杂的等待/通知机制。
import threading
import time
condition = threading.Condition()
items = []
def consumer(name):
with condition:
while len(items) == 0:
print(f"{name} 等待商品...")
condition.wait()
item = items.pop(0)
print(f"{name} 消费: {item}")
def producer():
for i in range(2):
time.sleep(1)
with condition:
item = f"商品-{i}"
items.append(item)
print(f"生产: {item}")
condition.notify_all() # 通知所有等待线程
if __name__ == '__main__':
# 创建消费者和生产者
consumer1 = threading.Thread(target=consumer, args=("消费者-1",))
consumer2 = threading.Thread(target=consumer, args=("消费者-2",))
producer_thread = threading.Thread(target=producer)
consumer1.start()
consumer2.start()
producer_thread.start()
consumer1.join()
consumer2.join()
producer_thread.join()4.3.6 Barrier 屏障
让一组线程在某个点同步。
import threading
import time
barrier = threading.Barrier(3)
def worker(name):
# 第一阶段工作
print(f"{name} 开始第一阶段")
time.sleep(1)
print(f"{name} 到达屏障,等待其他线程...")
barrier.wait() # 等待所有线程到达
# 第二阶段工作
print(f"{name} 开始第二阶段")
time.sleep(0.5)
print(f"{name} 完成")
if __name__ == '__main__':
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(f"Worker-{i}",))
threads.append(t)
t.start()
for t in threads:
t.join()4.4 线程本地存储 ThreadLocal
ThreadLocal 为每个线程提供独立的变量副本。
- 线程隔离:每个线程拥有自己的数据副本
- 避免锁竞争:不需要同步机制
- 跨函数访问:同一线程内可跨函数访问数据
4.4.2 ThreadLocal 使用示例
import threading
import time
# 创建ThreadLocal对象
local_data = threading.local()
def process_data(user_id):
# 设置线程本地数据
local_data.user_id = user_id
local_data.timestamp = time.time()
# 调用其他函数,无需传参
validate()
save()
def validate():
# 直接访问线程本地数据
print(f"验证用户: {local_data.user_id}")
def save():
# 直接访问线程本地数据
print(f"保存用户{local_data.user_id}的数据,时间: {local_data.timestamp:.2f}")
if __name__ == '__main__':
threads = []
for i in range(3):
t = threading.Thread(target=process_data, args=(f"user_{i}",))
threads.append(t)
t.start()
for t in threads:
t.join()5. 全局解释器锁(GIL)
5.1 GIL 的概念与影响
5.1.1 GIL 的定义
全局解释器锁(Global Interpreter Lock,GIL)是 CPython 解释器的一个机制,它保证同一时刻只有一个线程执行 Python 字节码。
GIL 的作用:
- 保护 Python 对象的引用计数
- 简化 CPython 的实现
- 避免多线程访问 Python 对象时的竞争条件
5.1.2 GIL 对性能的影响 --- 是的多线程主要是在 IO 密集任务中表现比较好
CPU 密集型任务:
- 多线程无法利用多核 CPU(在传统 Python 版本中)
- 性能可能比单线程更差(线程切换开销)
I/O 密集型任务:
- GIL 在 I/O 操作时会被释放
- 多线程可以提高并发性能
5.1.3 性能对比示例
import threading
import time
def cpu_bound(n):
"""CPU密集型任务"""
total = 0
for i in range(n):
total += i
return total
def io_bound(n):
"""I/O密集型任务"""
for i in range(n):
time.sleep(0.01) # 模拟I/O操作
def test_performance():
n = 1000000 # 减少计算量以缩短运行时间
# CPU密集型 - 单线程
start = time.time()
cpu_bound(n)
single_time = time.time() - start
print(f"CPU密集型 - 单线程: {single_time:.2f}秒")
# CPU密集型 - 多线程
start = time.time()
threads = []
for _ in range(2):
t = threading.Thread(target=cpu_bound, args=(n//2,))
threads.append(t)
t.start()
for t in threads:
t.join()
multi_time = time.time() - start
print(f"CPU密集型 - 多线程: {multi_time:.2f}秒")
def test_io_performance():
"""测试I/O密集型任务的性能"""
n = 5
# I/O密集型 - 单线程
start = time.time()
for _ in range(3):
io_bound(n)
single_io_time = time.time() - start
print(f"I/O密集型 - 单线程: {single_io_time:.2f}秒")
# I/O密集型 - 多线程
start = time.time()
threads = []
for _ in range(3):
t = threading.Thread(target=io_bound, args=(n,))
threads.append(t)
t.start()
for t in threads:
t.join()
multi_io_time = time.time() - start
print(f"I/O密集型 - 多线程: {multi_io_time:.2f}秒")
print(f"I/O任务加速比: {single_io_time/multi_io_time:.2f}x")
if __name__ == '__main__':
test_performance()
test_io_performance()5.2 绕过 GIL 的策略
5.2.1 使用多进程
对于 CPU 密集型任务,使用multiprocessing替代threading是传统的解决方案。
5.2.2 使用 C 扩展
某些 C 扩展库(如 NumPy、SciPy、Pandas)在执行计算密集型操作时会释放 GIL,从而实现真正的并行。
5.2.3 使用 Python 3.13+的自由线程模式
Python 3.13 引入了实验性的自由线程模式(Free-threading),可以通过编译时选项--enable-free-threaded启用,这将完全移除 GIL:
# 编译时启用自由线程模式
./configure --enable-free-threaded --with-lto
make在自由线程模式下:
- 消除了 GIL,实现真正的多线程并行
- 需要重新编译所有 C 扩展以支持自由线程
- 对现有代码可能需要少量调整以确保线程安全
- 性能收益在 CPU 密集型任务中尤为显著
5.2.4 使用其他 Python 实现
- Jython:运行在 JVM 上,没有 GIL
- IronPython:运行在.NET 上,没有 GIL
- PyPy:可通过编译选项移除 GIL,但需要特定配置
6. 线程应用最佳实践
6.1 选择线程还是进程
| 场景 | 推荐方案 | 原因 |
|---|---|---|
| CPU 密集型计算 | 多进程 | 避免 GIL 限制 |
| I/O 密集型任务 | 多线程 | 轻量级,切换快 |
| 大量短任务 | 线程池 | 复用线程,减少创建开销 |
| 需要隔离的任务 | 多进程 | 进程间完全隔离 |
| 共享大量数据 | 多线程 | 共享内存,通信简单 |
| Web 爬虫 | 多线程/异步 | I/O 密集型 |
| 数据分析 | 多进程 | CPU 密集型 |
6.2 线程安全编程准则
- 最小化共享状态
- 尽量使用局部变量
- 使用不可变对象
- 使用 ThreadLocal 存储线程特定数据
- 正确使用同步机制
- 选择合适的锁类型(Lock、RLock、Semaphore)
- 使用 with 语句确保锁的释放
- 避免过度同步
- 使用线程安全的数据结构
- queue.Queue 代替 list
- threading.local 代替全局变量
- 避免死锁
- 固定加锁顺序
- 使用超时机制
- 尽量减少锁的持有时间
总结
Python 多线程编程的核心要点:
- 线程基础理论:线程的定义、组成、内存模型、状态转换和调度机制
- 特殊线程类型:守护线程的特点和应用场景
- 线程创建与管理:threading.Thread 类的使用、自定义线程类、线程池
- 线程间通信与同步:
- 数据共享机制
- Queue 线程安全队列
- 同步原语:Lock、RLock、Semaphore、Event、Condition、Barrier
- ThreadLocal 线程本地存储
- GIL 全局解释器锁:理解 GIL 的影响和应对策略(包括 Python 3.13+的自由线程模式)
- 最佳实践:线程 vs 进程的选择、线程安全编程、实际应用案例
关键记忆点:
- 线程适合 I/O 密集型任务,进程适合 CPU 密集型任务
- GIL 限制了传统 Python 多线程在 CPU 密集型任务上的性能,但 Python 3.13+提供了自由线程选项
- 正确使用同步机制避免数据竞争和死锁
- 优先使用高级抽象(ThreadPoolExecutor、Queue)而非底层原语
- 线程编程需要特别注意共享数据的安全性