主页
avatar

Kared

使用多线程高效地利用资源:理解 GIL、锁和同步

1、线程的简介

线程(Thread),有时被称作轻量级进程,是操作系统分配处理器时间的基本单位。它是包含在进程中的,实际上代表了进程中的一个独立的执行流程。线程没有自己的资源,它仅拥有在执行中必须的少量资源(如寄存器状态、栈空间等),但能共享其所属进程的资源。

每个线程都维护着自己的执行上下文,这使得它可以被独立地调度和管理。最关键的上下文信息包括线程的指令指针和堆栈指针,这些信息使得线程在进程的地址空间中正确执行。

在多线程环境下,线程作为执行的基本单位,可以实现程序的并发执行。相较于进程,线程间的隔离程度低,它们共享内存和文件资源,这降低了通信的复杂性,并提高了资源的利用率。多线程的优点主要体现在以下几个方面:

  • 内存共享:线程之间共享内存空间比进程之间容易得多,这简化了数据共享的方式。
  • 资源开销小:与进程相比,线程的创建和上下文切换的资源消耗相对较小。
  • 性能提升:多线程可以提高程序的响应速度,特别是在进行I/O操作或其他等待任务时,可以显著提升程序的执行效率。
  • Python 支持:Python 提供了简单的多线程编程接口,使得开发者可以轻松地创建和管理线程。

2、线程的实现

① 使用 _thread 模块创建线程

Python 的 _thread 模块提供了低级别的、原始的线程以及一个简单的锁机制,允许直接在操作系统级别进行线程控制。这个模块是 Python 较早提供的多线程支持,主要用于启动新线程和简单的线程同步。

要创建和启动一个新线程,你可以使用 _thread 模块的 start_new_thread() 函数。该函数允许你派生一个新线程去执行特定的函数,它需要以下参数:

  • function:线程将要执行的函数。
  • args:作为元组 (tuple) 传递给线程函数的参数。
  • kwargs:可选参数,以关键字的形式传递给线程函数。

_thread 模块的 allocate_lock() 函数用于创建新的锁对象(LockType),这是一个基础的互斥锁,主要用于管理多个线程对共享资源访问的同步,防止多线程同时执行相同的操作。锁对象主要提供以下方法:

  • acquire(wait=None): 尝试获取锁。如果不设置 wait 或将其设为 True,调用将阻塞直至锁被释放并成功获取锁,返回 True。若 wait 设为 False,方法将非阻塞尝试获取锁,成功则返回 True,否则返回 False
  • locked(): 判断锁是否被获取。如果锁已经被任一线程获取,则返回 True;否则,返回 False
  • release(): 释放锁,仅锁的持有者可以释放。释放后,其他在 acquire() 上阻塞的线程会尝试获取锁。

此外,exit() 函数用于请求线程退出,通常当线程的函数执行完毕时就会自动退出,因此通常不需要直接调用此函数。

创建线程以及锁的示例:

import _thread
import time

# 定义线程执行的函数
def print_time(thread_name, delay, repeat):
    count = 0
    while count < repeat:
        time.sleep(delay)
        print(f"{thread_name}: {time.ctime(time.time())}")
        count += 1

# 创建线程的示例
try:
    _thread.start_new_thread(print_time, ("Thread-1", 2, 5))
    _thread.start_new_thread(print_time, ("Thread-2", 4, 2))
except Exception as e:
    print(f"无法启动线程: {e}")

# 创建一个锁
lock = _thread.allocate_lock()

# 使用锁的线程函数
def thread_function_with_lock(name, delay):
    while True:
        # 获取锁
        lock.acquire()
        try:
            print_time(name, delay, 1)
        finally:
            # 释放锁
            lock.release()
        time.sleep(delay)

# 创建并启动使用锁的线程
try:
    _thread.start_new_thread(thread_function_with_lock, ("Thread-3", 2))
    _thread.start_new_thread(thread_function_with_lock, ("Thread-4", 2))
except Exception as e:
    print(f"无法启动线程: {e}")

# 主线程中的无限循环,防止程序退出
while True:
    pass

在上述代码中,print_time 函数被两个线程 Thread-1Thread-2 调用,它们将按照指定的延迟打印当前时间。我们还展示了如何使用 allocate_lock() 创建锁,并在 thread_function_with_lock 函数中使用这个锁来同步 Thread-3Thread-4 的行为。

[scode type=“green” size=""]

**注意:**主线程中的 while True: pass 是必须的,因为它可以防止主线程退出,这确保了之前创建的线程能够继续运行。如果移除这段代码,可能会导致线程无法正常执行。然而,这种编程方式已经不是最佳实践,推荐使用更高级的 threading 模块。

[/scode]

threading 模块的使用

在接下来的内容中,我们将主要使用 threading 模块来演示多线程的实现。threading 模块提供了对于线程的高级操作,它包括了 _thread 模块的所有功能,并且添加了一些额外的方法和类。

threading 模块常用方法:

  • threading.currentThread(): 返回当前正在执行的线程对象,非常适合用来跟踪和调试。
  • threading.enumerate(): 返回一个包含所有活动线程的列表,包括主线程,有助于监控程序行为。
  • threading.activeCount(): 返回活跃线程的数量,实际上就是 threading.enumerate() 返回列表的长度,用于调试和监控。

Thread 类方法 为线程的控制和管理提供支持,通过合理使用这些方法,可以提升程序并行处理任务的能力及效率。

  • run(): 定义线程的行为。默认情况下,它执行构造器中指定的可调用对象,也可在子类中被重写。
  • start(): 启动线程,内部会调用 run 方法。
  • join([time]): 阻塞调用线程直到线程结束。如果指定了 time 参数,最多阻塞 time 秒。
  • is_alive(): 判断线程是否还在运行,返回 True 表示活动状态。
  • getName()setName(): 用于获取和设置线程的名称,有助于更好地管理和调试线程。

基础线程创建与运行

threading 模块允许用户利用多线程执行任务,从而提高程序的执行效率和响应速度,下面是简单实现的多线程的代码:

import threading
import time

def print_time(threadName, sleep_time, times):
    for item in range(times):
        time.sleep(sleep_time)
        print(f'{threadName}: {time.ctime(time.time())}')

if __name__ == '__main__':
    # 初始化并启动两个线程
    thread1 = threading.Thread(target=print_time, args=("Thread-1", 1, 6))
    thread2 = threading.Thread(target=print_time, args=("Thread-2", 2, 3))
    thread1.start()
    thread2.start()

    print("Main thread end.")

在这段代码中,我们定义了一个工作函数 print_time,然后通过 threading.Thread 传递该函数及其所需的参数以创建新线程,并使用 start() 方法来启动它们。值得注意的是,即使主线程已结束,这些子线程仍将在后台继续执行。

使用子类创建自定义线程

通过继承 threading.Thread 类,我们可以创建具有更多自定义功能的线程。

import threading
import time

class myThread(threading.Thread):
    def __init__(self, threadName, sleep_time, times):
        super().__init__()
        self.threadName = threadName
        self.sleep_time = sleep_time
        self.times = times

    def run(self):
        print(self.threadName + " start.")
        print_time(self.threadName, self.sleep_time, self.times)
        print(self.threadName + " end.")

def print_time(threadName, sleep_time, times):
    for item in range(times):
        time.sleep(sleep_time)
        print(f'{threadName}: {time.ctime(time.time())}')

if __name__ == '__main__':
    thread1 = myThread("Thread-1", 1, 6)
    thread2 = myThread("Thread-2", 2, 3)
    thread1.start()
    thread2.start()
    print("Main thread end.")

通过定义一个继承自 ThreadmyThread 类,我们可以在 run 方法中定制线程开始和结束时的特定行为。这种面向对象的方法使得代码不仅更加易于管理,也更易于理解。

守护线程

在某些情况下,我们希望子线程能随主线程的结束而自动结束,这就需要用到守护线程的概念。

import threading
import time

def print_time(threadName, sleep_time, times):
    for item in range(times):
        time.sleep(sleep_time)
        print(f'{threadName}: {time.ctime(time.time())}')

if __name__ == '__main__':
    thread1 = threading.Thread(target=print_time, args=("Thread-1", 1, 6), daemon=True)
    thread2 = threading.Thread(target=print_time, args=("Thread-2", 2, 3), daemon=True)
    thread1.start()
    thread2.start()
  
    print("Main thread end.")

在此代码段中,通过设置 daemon=True,我们将 thread1thread2 设为守护线程。这意味着,一旦主线程完成其任务并结束,这两个守护子线程也会立刻终止,而不会继续其执行流。

使用 join() 方法协调线程

为了确保主线程在所有子线程执行完毕后再结束,我们可以利用 join() 方法。

import threading
import time

def print_time(threadName, sleep_time, times):
    for item in range(times):
        time.sleep(sleep_time)
        print(f'{threadName}: {time.ctime(time.time())}')

if __name__ == '__main__':
    thread1 = threading.Thread(target=print_time, args=("Thread-1", 1, 6))
    thread2 = threading.Thread(target=print_time, args=("Thread-2", 2, 3))
    thread1.start()
    thread2.start()

    thread1.join()
    thread2.join()
    print("Main thread end.")

通过在此段代码中应用 join() 方法,我们让主线程等待 thread1thread2 这两个子线线程各自结束后才结束主线程。这样的同步机制确保了所有后台任务能够完全执行完毕。

3、多线程的同步机制

线程作为进程的执行分支,可以共享进程的内存与资源,便于在同一进程内的线程间交换数据。但这也引入了并发控制的需求,因为多线程同时修改同一数据可能会导致冲突和不一致性。因此,确保线程间的精确同步是保障数据完整性与一致性的关键。通过引入同步机制,我们能够控制在特定时间内只许一个线程修改共享数据,以此避免并发中的数据竞态问题。

多线程中的数据共享和冲突

在没有适当的同步机制时,当多个线程同时访问和修改同一个全局变量,就可能发生数据竞争,线程的操作可能会相互干扰,导致数据的不一致和预期之外的结果。为了演示这种情况,可是运行查看以下的代码示例:

import threading
import time

# 定义一个全局变量
global_var = 0

# 定义线程 1 的目标函数,尝试增加全局变量的值
def thread1_func():
    global global_var
    for _ in range(5):
        print("Thread 1 trying to modify global_var")
        temp = global_var
        time.sleep(0.1)  # 模拟长时间的数据处理过程
        global_var = temp + 1
        print(f"Thread 1: global_var is now {global_var}")

# 定义线程 2 的目标函数,尝试减少全局变量的值
def thread2_func():
    global global_var
    for _ in range(5):
        print("Thread 2 trying to modify global_var")
        temp = global_var
        time.sleep(0.1)  # 模拟长时间的数据处理过程
        global_var = temp - 1
        print(f"Thread 2: global_var is now {global_var}")

if __name__ == "__main__":
    # 创建线程
    thread1 = threading.Thread(target=thread1_func)
    thread2 = threading.Thread(target=thread2_func)

    # 启动线程
    thread1.start()
    thread2.start()

    # 等待线程完成
    thread1.join()
    thread2.join()

    print(f"Final value of global_var is {global_var}")

在这个例子中,我们没有使用任何同步机制,比如锁,来保护 global_var 在被线程访问和修改时的一致性。这样,当线程 1 和线程 2 同时运行时,它们都试图修改 global_var 的值,但由于缺乏同步,这两个线程有可能在它们各自认为 global_var 还未改变时,就读取、计算并试图写入新值。

由于线程的调度是由操作系统控制的,我们不能保证线程的执行顺序。因此,线程 2 可能在线程 1 读取 global_var 后、在计算其值之前执行,反之亦然。这导致 global_var 的最终结果依赖于线程的具体执行顺序,而这通常是不确定的。实际上,线程的这种非确定性执行模式可能导致预期之外的结果,这就是所谓的“竞态条件”。

实现线程同步:锁、信号量与事件

为了维护数据一致性,threading 模块提供了互斥锁(Lock)和递归锁(RLock)对象,其都提供了获取(acquire())和释放(release())方法,允许线程安全地锁定和释放特定资源。当需要顺序而非并发地访问资源时,利用这些方法对关键操作区块进行锁定,可以高效地实现线程间的同步。此外,Python 还提供了其他同步机制,如信号量(Semaphore)和事件(Event),以支持不同场景下对数据完整性和一致性的保护。

保护共享资源的互斥锁 (Lock)

互斥锁(Lock)是线程同步中的一种基本机制,它可以确保在任何时刻只有一个线程可以访问共享资源,从而防止多线程因并发访问而导致数据混乱的问题。

主要方法

  • acquire():线程调用此方法尝试获取锁。如果锁已被其他线程持有,则当前线程会阻塞等待直到锁被释放。如果锁被另一个线程持有,当前线程将根据 blockingtimeout 参数等待锁释放。
  • release():线程通过调用此方法来释放它所持有的锁,这样其他阻塞等待该锁的线程就可以获取锁并继续执行。

互斥锁的实现:为了防止死锁或程序逻辑错误,通常将 acquire()release() 方法调用放在 try/finally 块中,以确保即便在资源操作过程中发生异常,锁也能被正确释放。

import threading
import time

# 定义一个全局变量
global_var = 0

# 创建一个互斥锁
lock = threading.Lock()

# 定义线程 1 的目标函数,尝试增加全局变量的值
def thread1_func():
    global global_var
    for _ in range(5):
        print("Thread 1 trying to modify global_var")
        # 在修改前请求锁定
        lock.acquire()
        try:
            temp = global_var
            time.sleep(0.1)  # 模拟长时间的数据处理过程
            global_var = temp + 1
            print(f"Thread 1: global_var is now {global_var}")
        finally:
            # 确保锁会被释放
            lock.release()

# 定义线程 2 的目标函数,尝试减少全局变量的值
def thread2_func():
    global global_var
    for _ in range(5):
        print("Thread 2 trying to modify global_var")
        # 在修改前请求锁定
        lock.acquire()
        try:
            temp = global_var
            time.sleep(0.1)  # 模拟长时间的数据处理过程
            global_var = temp - 1
            print(f"Thread 2: global_var is now {global_var}")
        finally:
            # 确保锁会被释放
            lock.release()

if __name__ == "__main__":
    # 创建线程
    thread1 = threading.Thread(target=thread1_func)
    thread2 = threading.Thread(target=thread2_func)

    # 启动线程
    thread1.start()
    thread2.start()

    # 等待线程完成
    thread1.join()
    thread2.join()

    print(f"Final value of global_var is {global_var}")

通过对原程序的修改,我们引入了互斥锁来同步两个线程对全局变量 global_var 的访问与修改。如此以来,即便两个线程交替执行,共享变量的读写操作也是线程安全的,避免了并发访问导致的数据冲突问题。

递归锁 (RLock) 的嵌套锁定

递归锁(RLock)是一种特殊类型的锁,允许已获得该锁的线程进行重复加锁。这意味着同一个线程可以多次 acquire() 递归锁,但必须保持对 release() 的调用次数相等,以确保最终释放锁。在嵌套访问共享资源时,递归锁(RLock)的使用避免了普通互斥锁可能导致的死锁问题,优雅地解决了复杂的同步需求。

主要方法

  • acquire(): 线程调用此方法以获得锁。如果该线程已经拥有锁,则可以再次无阻塞地获取它,增加内部计数器。如果锁被另一个线程持有,当前线程将根据 blockingtimeout 参数等待锁释放。
  • release(): 线程通过调用此方法释放锁。如果调用线程在释放之前多次获取了锁,则需要等同数量的 release() 调用后,锁才会最终被释放,使得其他线程可以争取锁定。

递归锁的实现:递归锁 (RLock) 允许同一个线程多次请求同一个锁而不造成死锁,给出了对共享资源进行嵌套访问的能力。为了保证资源的安全释放,即使在遇到异常的情况下,推荐使用 with 语句上下文管理器来自动处理锁的获取和释放。

代码示例及场景说明

假设我们有一个“账户”类,这个类中有存款和取款两个方法,每个方法在执行时都需要确保线程安全。此外,还有一个转账的操作,转账会先调用取款方法,然后调用存款方法。如果使用普通的互斥锁 (Lock),那么在转账方法中先后调用取款和存款方法时,会因为尝试两次获得锁而导致死锁。此时,使用递归锁 (RLock) 就可以避免这个问题。

import threading

class Account:
    def __init__(self, balance):
        self.balance = balance
        self.lock = threading.RLock()

    def deposit(self, amount):
        with self.lock:
            self.balance += amount
            print(f'Deposited {amount}, new balance is {self.balance}')

    def withdraw(self, amount):
        with self.lock:
            if self.balance >= amount:
                self.balance -= amount
                print(f'Withdrew {amount}, new balance is {self.balance}')
            else:
                print('Insufficient funds')

    def transfer(self, target_account, amount):
        with self.lock:
            # 注意在一个已经加锁的方法中,再次调用了需要加锁的方法(取款、存款)
            self.withdraw(amount)
            target_account.deposit(amount)

# 示例使用
a1 = Account(1000)
a2 = Account(1000)

# 创建线程进行转账操作
t1 = threading.Thread(target=a1.transfer, args=(a2, 100))
t2 = threading.Thread(target=a2.transfer, args=(a1, 200))

t1.start()
t2.start()

t1.join()
t2.join()

print(f'Account 1 balance: {a1.balance}')
print(f'Account 2 balance: {a2.balance}')

这个例子中,Account 类包含了三个操作:存款、取款和转账。转账操作包含了先取款后存款的步骤,如果多个线程同时执行转账操作(比如账户之间互转),这时使用 RLock 可以避免因锁重入而导致的死锁问题。在 transfer 方法中,当 self.lock 已被当前线程获取后,同一线程中的 withdrawdeposit 方法可以再次“安全地”请求这个锁,从而避免死锁并保持操作的原子性。

信号量 (Semaphore) 机制

信号量(Semaphore)是一种用于控制访问共享资源的并发数的同步原语。基本思想是为共享资源设置一个计数器,表示可以同时访问该资源的线程数量上限。信号量主要提供两个操作:acquire(获取)和 release(释放)。

  • acquire():线程通过调用此方法从信号量获取一个单位。如果信号量的计数器为零,则调用线程将阻塞,直到信号量被释放(即计数器大于零)。
  • release():释放信号量,将计数器增加一个单位。释放操作通常代表释放了对共享资源的使用权,使其他线程可以访问该资源。

生产者—消费者问题

生产者消费者问题是并发编程中的一个经典问题,涉及到两类进程,即生产者和消费者,它们通过共享的缓冲区进行通信。生产者的任务是生成数据,放入缓冲区;消费者则从缓冲区取出数据进行处理。问题的关键在于确保当缓冲区为空时,消费者会等待(阻塞)直到有数据可取,同样,当缓冲区已满时,生产者也会等待直到缓冲区有空位可用来放置新的数据。

import threading
import time
import queue

# 设定缓冲区大小为 5
buffer = queue.Queue(maxsize=5)

# 创建信号量,初始化为缓冲区大小
empty_slots = threading.Semaphore(5)
filled_slots = threading.Semaphore(0)

def producer():
    while True:
        item = "Product"
        empty_slots.acquire()  # 等待空位

        buffer.put(item)
        print("Producer produced:", item)

        filled_slots.release()  # 发布填充信号
        time.sleep(1)

def consumer():
    while True:
        filled_slots.acquire()  # 等待产品

        item = buffer.get()
        print("Consumer consumed:", item)

        empty_slots.release()  # 释放空位信号
        time.sleep(1)

if __name__ == '__main__':
    # 创建生产者和消费者线程
    producer_thread = threading.Thread(target=producer)
    consumer_thread = threading.Thread(target=consumer)

    # 启动线程
    producer_thread.start()
    consumer_thread.start()

    producer_thread.join()
    consumer_thread.join()

代码示例中,empty_slotsfilled_slots 信号量分别代表缓冲区空位和填充位的数量。生产者在生产前检查是否有空位可用(通过 empty_slots.acquire() 方法),消费者在消费前确认是否有产品可消费(通过 filled_slots.acquire() 方法)。这种信号量同步机制确保了生产者和消费者之间对共享资源(即缓冲区)访问的平衡与协调。

事件 (Event) 通信机制

事件(Event)是一种线程同步机制,允许一个或多个线程等待某些事件的发生。在 Python 的 threading 模块中,Event 对象提供了简单的状态标志,可以被线程设置(signal)或清除(clear),线程通过等待事件的设置来阻塞其执行,直到另一个线程触发该事件。

Event 对象主要提供以下方法:

  • set():设置事件状态为 ‘True’,唤醒所有等待该事件的线程。
  • clear():重置事件状态为 ‘False’。
  • is_set():检查事件状态,返回 True 如果事件被设置,否则返回 False
  • wait(timeout=None):阻塞线程直到事件的状态变为 ‘True’。timeout 参数指定等待的最长时间,如果省略则无限期等待。

解决生产者—消费者问题

import threading
import time

# 初始化事件
data_available = threading.Event()
data_consumed = threading.Event()

def producer():
    global data_storage
    for i in range(5):  # 生产者生成5次数据
        time.sleep(1)  # 模拟数据生产的时间
        data_storage = f"Product-{i + 1}"
        print(f"Producer produced: {data_storage}")
        data_available.set()  # 通知消费者数据已准备好
        data_consumed.wait()  # 等待消费者消费数据
        data_consumed.clear()  # 重置事件以供下次使用

def consumer():
    global data_storage
    for _ in range(5):  # 消费者消费5次数据
        data_available.wait()  # 等待生产者生产数据
        print(f"Consumer consumed: {data_storage}")
        data_storage = None  # 模拟数据消费
        data_available.clear()  # 重置事件以供下次生产
        data_consumed.set()  # 通知生产者数据已消费

if __name__ == "__main__":
    # 初始化数据存储和事件
    data_storage = None
    data_consumed.set()  # 初始状态,允许生产者先开始

    # 创建并启动生产者和消费者线程
    producer_thread = threading.Thread(target=producer)
    consumer_thread = threading.Thread(target=consumer)

    producer_thread.start()
    consumer_thread.start()

    # 等待线程结束
    producer_thread.join()
    consumer_thread.join()

本示例中,我们使用事件(Event)来控制生产和消费的同步,设置一个生产者和一个消费者,通过两个事件互相协调:data_available 用于通知消费者数据已经准备好,而 data_consumed 通知生产者数据已被消费,可以继续生产新的数据,确保了生产者和消费者的操作是同步的,并且在每次数据生产和消费之间都维持了恰当的协调。

4、线程安全队列:协调线程通讯

queue 模块为多线程编程提供了三种核心队列类型——FIFO(先入先出)队列 Queue,LIFO(后入先出)队列 LifoQueue,以及优先级队列 PriorityQueue。为不同的数据处理需求提供解决方案,使用它们可以大大简化线程间的数据交换和同步流程。

队列的基本操作

队列在多线程编程中提供了一系列关键操作,使得线程间的数据同步和交流变得非常高效和安全:

  • qsize():返回队列中的元素数量,不过在多线程情况下,这个数值只是近似的。
  • empty():如果队列为空返回 True,否则返回 False。因为并发性,应谨慎使用。
  • full():检查队列是否已满,满了返回 True,没有满返回 False。其行为由初始化时的 maxsize 参数决定,maxsize=0 代表队列无容量限制。
  • get([block[, timeout]]):移除并返回队列的第一个元素。blocktimeout 参数分别决定是否阻塞及阻塞时间,用于队列为空时的处理。
  • put(item[, block[, timeout]]):向队列添加一个元素。同样,blocktimeout 参数用于队列已满时的行为控制。
  • task_done():标记队列中的一个任务为完成状态。与 join() 方法配合,实现等待所有任务完成的功能。
  • join():使调用线程等待,直到队列中的所有任务被处理。

阻塞与非阻塞模式

  • 阻塞模式:默认情况下,如果队列为空,get() 方法会使线程等待直到队列中有元素可取;类似地,如果队列已满,put() 方法会使线程等待直到队列有空间放入新元素。
  • 非阻塞模式:通过 get_nowait()put_nowait() 方法(即设置 get(block=False)put(block=False)),可以在队列满或空的情况下立刻引发异常(queue.Fullqueue.Empty),避免等待。

三种类型的队列

FIFO 队列 (Queue)

Queue 类实现了标准的 FIFO 策略,适用于大多数需要按顺序处理数据的场合。

from queue import Queue

q = Queue()
for i in range(5):
    q.put(i)
while not q.empty():
    print(f'FIFO output: {q.get()}')

LIFO 队列 (LifoQueue)

LifoQueue 实现了类似于栈的数据结构,后进的元素会被先处理。

from queue import LifoQueue

q = LifoQueue()
for i in range(5):
    q.put(i)
while not q.empty():
    print(f'LIFO output: {q.get()}')

优先级队列 (PriorityQueue)

PriorityQueue 允许元素以定义的优先级顺序被处理。优先级较高的元素(较小的优先级数值)将先出队。

from queue import PriorityQueue

q = PriorityQueue()
q.put((2, 'Medium priority'))
q.put((1, 'High priority'))
q.put((3, 'Low priority'))
while not q.empty():
    print(f'Priority output: {q.get()[1]}')

5、理解全局解释器锁(GIL)

GIL,全名为 Global Interpreter Lock,是 Python 解释器 CPython 为了确保线程安全而引入的一种锁机制。它确保任何时刻,只有一个线程可以在解释器中执行 Python 字节码。这意味着,即使在多核处理器上,Python 程序也不能实现真正意义上的多线程并行执行。GIL 的存在主要是为了简化复杂性和避免与 Python 内存管理相关的并发冲突。

对多线程的影响

在单核处理器环境下,多线程执行的任务需要通过时间切片来模拟并行执行,但在多核处理器中,理想情况下可以实现真正的并行。然而,由于 GIL 的存在,Python 的多线程程序无法在多核处理器上并行执行。主要受到影响的是计算密集型任务,因为这些任务中线程频繁竞争 GIL,限制了多线程的效能。相反,I/O 密集型任务,如文件读写操作,因为线程在等待 I/O 操作完成时会释放 GIL,从而减少了对 GIL 的影响。

克服 GIL 的策略

  • 多进程:使用 multiprocessing 模块创建进程,每个进程拥有自己的 Python 解释器和独立的 GIL。这样,各进程可以并行运行在多核处理器上。
  • 特定库:一些如 NumPy 等进行数学运算的库能够绕过 GIL,这是因为它们在执行计算密集型任务时释放了 GIL。
  • 其他解释器:选择不同于 CPython 的 Python 解释器,如 Jython 和 IronPython,这些解释器没有 GIL,从而可以实现真正的多线程并行。

GIL 的版本差异

  • Python 2.x:GIL 的释放策略基于操作的次数,每执行一定数量的指令后强制释放 GIL,允许其他线程执行。
  • Python 3.x:改进了 GIL 的管理策略,使用基于时间的竞争机制释放 GIL,改善了 CPU 密集型应用的性能,但并没有移除 GIL。
Python 多线程 GIL 同步 _thread threading 信号量 全局解释器锁