Skip to content

第三部分:多线程

1. 线程基础理论

1.1 线程的定义与组成

1.1.1 线程的定义

  • 狭义定义:线程是进程中的一个执行单元,是 CPU 调度和分派的基本单位。
  • 广义定义:线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。一个进程中可以并发多个线程,每条线程并行执行不同的任务。

1.1.2 线程与进程的区别

  • 进程是资源分配的基本单位,拥有独立的地址空间。
  • 线程是 CPU 调度的基本单位,共享进程的地址空间。

一个进程可以包含多个线程,这些线程共享进程的资源(内存、文件描述符等),但每个线程有自己独立的执行栈和程序计数器。

1.1.3 线程的组成

一个线程主要由以下部分组成:

  1. 线程 ID(TID):线程的唯一标识符,在进程内唯一。
  2. 程序计数器(PC):指向线程当前执行的指令地址。
  3. 寄存器集合:保存线程的工作变量和中间结果。
  4. 栈(Stack):存储线程的局部变量、函数调用信息和返回地址,每个线程有独立的栈。
  5. 线程控制块(TCB):类似于进程的 PCB,记录线程的状态信息、优先级等。

线程共享的进程资源

  • 代码段(Text Segment)
  • 数据段(Data Segment)
  • BSS 段
  • 堆区(Heap)
  • 文件描述符表
  • 信号处理器
  • 进程 ID 和进程组 ID

1.2 线程控制块与内存模型

1.2.1 线程控制块(TCB)

线程控制块(Thread Control Block,TCB)是操作系统用于管理线程的数据结构。每个线程都有一个 TCB,它包含:

  1. 线程标识信息
    • 线程 ID(TID)
    • 所属进程 ID(PID)
    • 线程组 ID
  2. 处理器状态信息
    • 程序计数器(PC)
    • CPU 寄存器内容
    • 栈指针(SP)
  3. 线程调度信息
    • 线程状态(新建、就绪、运行、阻塞、终止)
    • 线程优先级
    • 调度策略参数
  4. 栈信息
    • 用户栈指针
    • 内核栈指针
    • 栈大小限制

1.2.2 线程内存布局

线程在进程内存空间中的布局特点:

线程独有的内存区域

  • 线程栈:每个线程有自己的栈空间,存储局部变量和函数调用信息
  • 线程局部存储(TLS):线程私有的全局变量存储区域
  • 寄存器状态:每个线程独立的 CPU 寄存器值

线程共享的内存区域

  • 代码段:所有线程执行相同的程序代码
  • 数据段和 BSS 段:全局变量和静态变量
  • 堆区:动态分配的内存
  • 内存映射区:共享库、文件映射等

1.2.3 线程上下文切换

线程上下文切换比进程上下文切换更轻量,主要步骤:

  1. 保存当前线程的 CPU 寄存器状态
  2. 更新当前线程的 TCB 信息
  3. 选择下一个要执行的线程
  4. 恢复被选中线程的 CPU 寄存器状态
  5. 切换栈指针到新线程的栈
  6. 跳转到新线程的执行点

由于线程共享地址空间,不需要切换页表和刷新 TLB,所以比进程切换更快。

1.3 线程标识符与层次关系

1.3.1 线程标识符(TID)

每个线程都有一个唯一的线程 ID(TID),在进程内唯一标识该线程。与进程的 PID 类似,TID 用于:

  • 线程调度和管理
  • 线程间通信和同步
  • 调试和监控

在 Linux 系统中,线程实际上是轻量级进程(LWP),每个线程都有系统级的 TID。

1.3.2 主线程与工作线程

主线程(Main Thread)

  • 进程创建时自动创建的第一个线程
  • 负责进程的初始化和其他线程的创建
  • 主线程结束通常意味着整个进程结束
  • 拥有进程的主要执行流

工作线程(Worker Thread)

  • 由主线程或其他线程创建的辅助线程
  • 执行特定的任务或功能
  • 可以是守护线程或非守护线程
  • 生命周期可以独立于主线程(非守护线程)或依赖于主线程(守护线程)

1.4 线程状态与调度

1.4.1 线程状态

线程的生命周期包含以下几种状态:

五种基本状态

  1. 新建状态(New):线程对象被创建但尚未启动
  2. 就绪状态(Runnable):线程可以运行,正在等待 CPU 时间片
  3. 运行状态(Running):线程获得 CPU,正在执行
  4. 阻塞状态(Blocked):线程因等待某个条件而暂停执行
  5. 终止状态(Terminated):线程执行完毕或被强制终止

1.4.2 线程调度

线程调度通常由操作系统的线程调度器负责,主要策略与进程调度类似:

用户级线程 vs 内核级线程

  1. 用户级线程
    • 由用户空间的线程库管理
    • 切换快速,不需要内核参与
    • 一个线程阻塞会导致整个进程阻塞
    • Python 的早期线程实现
  2. 内核级线程
    • 由操作系统内核直接管理
    • 切换需要系统调用,开销较大
    • 真正的并发执行(在多核系统上)
    • 现代 Python 线程的实现方式

Python 线程调度特点

  • 使用操作系统原生线程(1:1 模型)
  • 受 GIL(全局解释器锁)限制(除 Python 3.13+自由线程模式)
  • 在 I/O 操作时会释放 GIL,允许其他线程运行
  • CPU 密集型任务无法真正并行(在带 GIL 的 Python 版本中)

1.5 线程 vs 进程对比总结

特性进程线程
定义资源分配的基本单位CPU 调度的基本单位
地址空间独立的地址空间共享进程的地址空间
资源开销创建销毁开销大创建销毁开销小
内存占用独立内存,占用较大共享内存,占用较小
通信方式IPC(管道、队列、共享内存等)直接读写共享变量
上下文切换慢(需切换地址空间)快(同一地址空间)
并发性真正的并行(多核)Python 中受 GIL 限制(传统版本)
稳定性一个进程崩溃不影响其他一个线程崩溃可能影响整个进程
数据共享显式通信,安全性高隐式共享,需要同步
适用场景CPU 密集型、需要隔离I/O 密集型、需要共享数据

2. 特殊线程类型

2.1 守护线程深入解析

守护线程(Daemon Thread)是一种在后台运行的特殊线程,主要用于执行辅助性任务。

2.1.1 守护线程的特点

  • 生命周期依赖:守护线程的生命周期依赖于非守护线程,当所有非守护线程结束时,守护线程会被强制终止
  • 后台运行:通常用于执行后台任务,如垃圾回收、日志记录、监控等
  • 自动清理:程序退出时不需要等待守护线程完成
  • 不保证完成:由于会被强制终止,守护线程的任务可能无法完成

2.1.2 守护线程与普通线程对比

特性普通线程守护线程
生命周期独立运行直到完成随非守护线程结束而终止
程序退出程序会等待其完成程序不等待,直接终止
任务保证保证任务完成不保证任务完成
典型用途主要业务逻辑辅助任务、监控、日志
设置方式默认daemon=Falsethread.daemon=True

2.2 主线程与工作线程

2.2.1 主线程特性

  • 自动创建:程序启动时自动创建的第一个线程
  • 程序入口:执行程序的主要逻辑
  • 管理职责:负责创建和管理其他线程
  • 退出影响:主线程结束可能导致整个程序退出(取决于是否有非守护线程)

2.2.2 工作线程特性

  • 按需创建:根据任务需要创建
  • 专门任务:通常负责特定的功能模块
  • 并发执行:多个工作线程可以并发执行
  • 资源共享:与主线程共享进程资源

3. 线程创建与管理

3.1 threading.Thread 的工作原理

threading.Thread 是 Python 中创建和管理线程的主要类,它封装了底层的线程操作。

3.1.1 Thread 类工作机制

当调用 Thread.start() 时的执行流程:

  1. 创建新的系统线程
  2. 在新线程中调用 run() 方法
  3. run() 方法执行 target 指定的函数
  4. 函数执行完毕,线程自动终止

3.1.2 基础示例

python
import threading
import time

def worker_task(name, duration):
    """工作线程任务"""
    print(f"线程 {name} 开始")
    for i in range(duration):
        time.sleep(1)
        print(f"线程 {name} 工作中... {i+1}/{duration}")
    print(f"线程 {name} 完成")

if __name__ == '__main__':
    print(f"主线程开始")

    # 创建线程
    thread = threading.Thread(
        target=worker_task,
        args=("Worker-1", 3)
    )

    print("启动工作线程...")
    thread.start()

    print("主线程继续执行")

    print("等待工作线程完成...")
    thread.join()

    print("所有任务完成")

3.1.3 Thread 类的方法和属性

常用方法

方法名描述
t.start()启动线程,调用 run()方法
t.run()线程启动时执行的方法
t.join([timeout])等待线程结束,可指定超时时间
t.is_alive()检查线程是否还在运行

常用属性

属性名描述
t.name线程名称
t.ident线程标识符(启动后才有)
t.daemon是否为守护线程
t.native_id系统原生线程 ID(Python 3.8+)

3.2 自定义 Thread 类

通过继承 threading.Thread 类可以创建自定义线程类,便于封装复杂的线程逻辑。

3.2.1 继承 Thread 类

python
import threading
import time

class CountThread(threading.Thread):
    """计数线程类示例"""

    def __init__(self, name, count):
        super().__init__(name=name)
        self.count = count

    def run(self):
        """重写run方法"""
        for i in range(self.count):
            print(f"{self.name}: {i}")
            time.sleep(0.5)
        print(f"{self.name} 完成计数")

if __name__ == '__main__':
    # 创建并启动线程
    t1 = CountThread("Thread-1", 3)
    t2 = CountThread("Thread-2", 3)

    t1.start()
    t2.start()

    t1.join()
    t2.join()

    print("所有线程完成")

3.3 线程池 ThreadPoolExecutor

concurrent.futures.ThreadPoolExecutor 提供了线程池功能,用于管理和复用线程。

3.3.1 线程池的优势

  • 资源复用:避免频繁创建销毁线程的开销
  • 并发控制:限制最大线程数,防止资源耗尽
  • 任务队列:自动管理任务分配
  • 简化编程:提供高级接口,简化并发编程

3.3.2 ThreadPoolExecutor 使用

python
from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    """模拟任务"""
    time.sleep(0.5)
    return n * n

if __name__ == '__main__':
    # 使用线程池执行任务
    with ThreadPoolExecutor(max_workers=3) as executor:
        # 提交多个任务
        futures = [executor.submit(task, i) for i in range(5)]

        # 获取结果
        results = [future.result() for future in futures]
        print(f"结果: {results}")

4. 线程间通信与同步

4.1 线程数据共享

由于线程共享进程的内存空间,线程间可以直接访问共享变量,但这也带来了数据安全问题。

  • 多线程一般考虑的是线程同步!

4.1.1 数据共享示例

python
import threading
import time

# 共享变量
counter = 0

def unsafe_increment(name, n):
    global counter
    for _ in range(n):
        temp = counter
        temp += 1
        counter = temp  # 非原子操作,可能产生竞态条件
        time.sleep(0.0001)  # 增加竞态条件的出现概率

if __name__ == '__main__':
    threads = []
    for i in range(2):
        t = threading.Thread(target=unsafe_increment, args=(f"Thread-{i}", 1000))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    print(f"最终计数 (期望2000): {counter}")  # 结果可能不是2000

4.2 线程安全队列 Queue

queue.Queue 提供了线程安全的队列实现,适合生产者-消费者模式。

4.2.1 Queue 的特点

  • 线程安全:内部使用锁保证操作的原子性
  • 阻塞操作:支持阻塞的 put 和 get 操作
  • 多种类型:Queue(FIFO)、LifoQueue(LIFO)、PriorityQueue(优先级)

4.2.2 Queue 使用示例

python
import queue
import threading
import time

def producer(q):
    """生产者:向队列添加数据"""
    for i in range(3):
        item = f"item-{i}"
        q.put(item)
        print(f"生产: {item}")
        time.sleep(0.1)

def consumer(q):
    """消费者:从队列取出数据"""
    while True:
        try:
            item = q.get(timeout=0.5)
            print(f"消费: {item}")
            q.task_done()
        except queue.Empty:
            break

if __name__ == '__main__':
    q = queue.Queue()

    # 创建并启动线程
    producer_thread = threading.Thread(target=producer, args=(q,))
    consumer_thread = threading.Thread(target=consumer, args=(q,))

    producer_thread.start()
    consumer_thread.start()

    producer_thread.join()
    q.join()  # 等待队列处理完成

4.3 线程同步原语

4.3.1 Lock 互斥锁

Lock 是最基本的同步原语,用于保护临界区。

python
import threading

counter = 0
lock = threading.Lock()

def safe_increment(name, n):
    global counter
    for _ in range(n):
        with lock:  # 获取锁,确保原子操作
            counter += 1

if __name__ == '__main__':
    threads = []
    for _ in range(5):
        t = threading.Thread(target=safe_increment, args=(f"Thread-{_}", 1000))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    print(f"最终计数: {counter}")  # 应该是5000

4.3.2 RLock 递归锁

RLock 允许同一线程多次获取同一把锁。

python
import threading
import time

rlock = threading.RLock()

def recursive_function(name, depth):
    with rlock:
        print(f"{name}: 深度 {depth}")
        if depth > 0:
            recursive_function(name, depth - 1)

if __name__ == '__main__':
    t = threading.Thread(target=recursive_function, args=("Thread-1", 3))
    t.start()
    t.join()

4.3.3 Semaphore 信号量

控制同时访问资源的线程数量。

python
import threading
import time

# 最多允许2个线程同时访问
semaphore = threading.Semaphore(2)

def access_resource(name):
    with semaphore:
        print(f"{name} 获得资源访问权")
        time.sleep(1)
        print(f"{name} 释放资源")

if __name__ == '__main__':
    threads = []
    for i in range(4):
        t = threading.Thread(target=access_resource, args=(f"Thread-{i}",))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

4.3.4 Event 事件

用于线程间的简单信号通信。

python
import threading
import time

event = threading.Event()

def waiter(name):
    print(f"{name} 等待事件...")
    event.wait()
    print(f"{name} 收到事件,开始工作")

def setter():
    print("准备触发事件...")
    time.sleep(2)
    print("触发事件!")
    event.set()

if __name__ == '__main__':
    # 创建等待线程
    waiters = [threading.Thread(target=waiter, args=(f"Waiter-{i}",)) for i in range(2)]
    trigger = threading.Thread(target=setter)

    for w in waiters:
        w.start()
    trigger.start()

    for w in waiters:
        w.join()
    trigger.join()

4.3.5 Condition 条件变量

提供更复杂的等待/通知机制。

python
import threading
import time

condition = threading.Condition()
items = []

def consumer(name):
    with condition:
        while len(items) == 0:
            print(f"{name} 等待商品...")
            condition.wait()
        item = items.pop(0)
        print(f"{name} 消费: {item}")

def producer():
    for i in range(2):
        time.sleep(1)
        with condition:
            item = f"商品-{i}"
            items.append(item)
            print(f"生产: {item}")
            condition.notify_all()  # 通知所有等待线程

if __name__ == '__main__':
    # 创建消费者和生产者
    consumer1 = threading.Thread(target=consumer, args=("消费者-1",))
    consumer2 = threading.Thread(target=consumer, args=("消费者-2",))
    producer_thread = threading.Thread(target=producer)

    consumer1.start()
    consumer2.start()
    producer_thread.start()

    consumer1.join()
    consumer2.join()
    producer_thread.join()

4.3.6 Barrier 屏障

让一组线程在某个点同步。

python
import threading
import time

barrier = threading.Barrier(3)

def worker(name):
    # 第一阶段工作
    print(f"{name} 开始第一阶段")
    time.sleep(1)

    print(f"{name} 到达屏障,等待其他线程...")
    barrier.wait()  # 等待所有线程到达

    # 第二阶段工作
    print(f"{name} 开始第二阶段")
    time.sleep(0.5)
    print(f"{name} 完成")

if __name__ == '__main__':
    threads = []
    for i in range(3):
        t = threading.Thread(target=worker, args=(f"Worker-{i}",))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

4.4 线程本地存储 ThreadLocal

ThreadLocal 为每个线程提供独立的变量副本。

  • 线程隔离:每个线程拥有自己的数据副本
  • 避免锁竞争:不需要同步机制
  • 跨函数访问:同一线程内可跨函数访问数据

4.4.2 ThreadLocal 使用示例

python
import threading
import time

# 创建ThreadLocal对象
local_data = threading.local()

def process_data(user_id):
    # 设置线程本地数据
    local_data.user_id = user_id
    local_data.timestamp = time.time()

    # 调用其他函数,无需传参
    validate()
    save()

def validate():
    # 直接访问线程本地数据
    print(f"验证用户: {local_data.user_id}")

def save():
    # 直接访问线程本地数据
    print(f"保存用户{local_data.user_id}的数据,时间: {local_data.timestamp:.2f}")

if __name__ == '__main__':
    threads = []
    for i in range(3):
        t = threading.Thread(target=process_data, args=(f"user_{i}",))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

5. 全局解释器锁(GIL)

5.1 GIL 的概念与影响

5.1.1 GIL 的定义

全局解释器锁(Global Interpreter Lock,GIL)是 CPython 解释器的一个机制,它保证同一时刻只有一个线程执行 Python 字节码。

GIL 的作用

  • 保护 Python 对象的引用计数
  • 简化 CPython 的实现
  • 避免多线程访问 Python 对象时的竞争条件

5.1.2 GIL 对性能的影响 --- 是的多线程主要是在 IO 密集任务中表现比较好

CPU 密集型任务

  • 多线程无法利用多核 CPU(在传统 Python 版本中)
  • 性能可能比单线程更差(线程切换开销)

I/O 密集型任务

  • GIL 在 I/O 操作时会被释放
  • 多线程可以提高并发性能

5.1.3 性能对比示例

python
import threading
import time

def cpu_bound(n):
    """CPU密集型任务"""
    total = 0
    for i in range(n):
        total += i
    return total

def io_bound(n):
    """I/O密集型任务"""
    for i in range(n):
        time.sleep(0.01)  # 模拟I/O操作

def test_performance():
    n = 1000000  # 减少计算量以缩短运行时间

    # CPU密集型 - 单线程
    start = time.time()
    cpu_bound(n)
    single_time = time.time() - start
    print(f"CPU密集型 - 单线程: {single_time:.2f}秒")

    # CPU密集型 - 多线程
    start = time.time()
    threads = []
    for _ in range(2):
        t = threading.Thread(target=cpu_bound, args=(n//2,))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    multi_time = time.time() - start
    print(f"CPU密集型 - 多线程: {multi_time:.2f}秒")

def test_io_performance():
    """测试I/O密集型任务的性能"""
    n = 5

    # I/O密集型 - 单线程
    start = time.time()
    for _ in range(3):
        io_bound(n)
    single_io_time = time.time() - start
    print(f"I/O密集型 - 单线程: {single_io_time:.2f}秒")

    # I/O密集型 - 多线程
    start = time.time()
    threads = []
    for _ in range(3):
        t = threading.Thread(target=io_bound, args=(n,))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    multi_io_time = time.time() - start
    print(f"I/O密集型 - 多线程: {multi_io_time:.2f}秒")
    print(f"I/O任务加速比: {single_io_time/multi_io_time:.2f}x")

if __name__ == '__main__':
    test_performance()
    test_io_performance()

5.2 绕过 GIL 的策略

5.2.1 使用多进程

对于 CPU 密集型任务,使用multiprocessing替代threading是传统的解决方案。

5.2.2 使用 C 扩展

某些 C 扩展库(如 NumPy、SciPy、Pandas)在执行计算密集型操作时会释放 GIL,从而实现真正的并行。

5.2.3 使用 Python 3.13+的自由线程模式

Python 3.13 引入了实验性的自由线程模式(Free-threading),可以通过编译时选项--enable-free-threaded启用,这将完全移除 GIL:

bash
# 编译时启用自由线程模式
./configure --enable-free-threaded --with-lto
make

在自由线程模式下:

  • 消除了 GIL,实现真正的多线程并行
  • 需要重新编译所有 C 扩展以支持自由线程
  • 对现有代码可能需要少量调整以确保线程安全
  • 性能收益在 CPU 密集型任务中尤为显著

5.2.4 使用其他 Python 实现

  • Jython:运行在 JVM 上,没有 GIL
  • IronPython:运行在.NET 上,没有 GIL
  • PyPy:可通过编译选项移除 GIL,但需要特定配置

6. 线程应用最佳实践

6.1 选择线程还是进程

场景推荐方案原因
CPU 密集型计算多进程避免 GIL 限制
I/O 密集型任务多线程轻量级,切换快
大量短任务线程池复用线程,减少创建开销
需要隔离的任务多进程进程间完全隔离
共享大量数据多线程共享内存,通信简单
Web 爬虫多线程/异步I/O 密集型
数据分析多进程CPU 密集型

6.2 线程安全编程准则

  1. 最小化共享状态
    • 尽量使用局部变量
    • 使用不可变对象
    • 使用 ThreadLocal 存储线程特定数据
  2. 正确使用同步机制
    • 选择合适的锁类型(Lock、RLock、Semaphore)
    • 使用 with 语句确保锁的释放
    • 避免过度同步
  3. 使用线程安全的数据结构
    • queue.Queue 代替 list
    • threading.local 代替全局变量
  4. 避免死锁
    • 固定加锁顺序
    • 使用超时机制
    • 尽量减少锁的持有时间

总结

Python 多线程编程的核心要点:

  1. 线程基础理论:线程的定义、组成、内存模型、状态转换和调度机制
  2. 特殊线程类型:守护线程的特点和应用场景
  3. 线程创建与管理:threading.Thread 类的使用、自定义线程类、线程池
  4. 线程间通信与同步
    • 数据共享机制
    • Queue 线程安全队列
    • 同步原语:Lock、RLock、Semaphore、Event、Condition、Barrier
    • ThreadLocal 线程本地存储
  5. GIL 全局解释器锁:理解 GIL 的影响和应对策略(包括 Python 3.13+的自由线程模式)
  6. 最佳实践:线程 vs 进程的选择、线程安全编程、实际应用案例

关键记忆点

  • 线程适合 I/O 密集型任务,进程适合 CPU 密集型任务
  • GIL 限制了传统 Python 多线程在 CPU 密集型任务上的性能,但 Python 3.13+提供了自由线程选项
  • 正确使用同步机制避免数据竞争和死锁
  • 优先使用高级抽象(ThreadPoolExecutor、Queue)而非底层原语
  • 线程编程需要特别注意共享数据的安全性

邬东升的博客