多线程

多线程#

本文用一个例子来引入多线程,稍微有一些难度,需要花一些时间理解。难点在于如何使用 CTRL-C 终止多线程:这是因为多线程运行时,CTRL-C 只能终止主线程,而不能终止子线程。另外需要注意的是,如果你用到了 proc = subprocess.Popen() 函数,那么 CTRL-C 发出的 SIGINT 信号同样无法被 proc 接收到,你需要单独用 proc.terminate() 函数来终止由 subprocess 创建的进程。

问题描述#

目标:使用 Python 实现一个简单的问答模型,它的运行逻辑遵循下面的规则:

  1. 使用两个线程 Thread-Q(提问题的线程) 和 Thread-A(回答问题的线程)。

  2. Thread-A 阻塞等待 Thread-Q 提出问题。

  3. Thread-Q 提出问题,阻塞等待 Thread-A 回答。

  4. Thread-A 回答 Thread-Q 的问题,然后阻塞等待 Thread-Q 继续提问。

  5. Thread-Q 打印 Thread-A 的回答,然后提出新的问题,阻塞等待 Thread-A 回答。

  6. 重复步骤 4 和步骤 5。

具体实现#

import queue
import signal
import threading

exitFlag = False
stopFlag = False

threadList = ["Thread-Q", "Thread-A"]
threads = []

questionQueueLock = threading.Lock()
questionQueue = queue.Queue(10)
answerQueue = queue.Queue(10)
questionList = [
    "What is your name?",
    "Where are you from?",
    "How's the weather today?",
    "What day is it today?",
    "Do you like fruits?",
]
answerList = ["Sam", "China", "Sunny", "Friday", "Yes"]
for answer in answerList:
    answerQueue.put(answer)


# 多线程的 CTRL-C 事件需要单独处理,才能让多线程正常退出
def interrupt_handler(signum, frame):
    global stopFlag
    stopFlag = True  # stop child thread
    print("receive a signal %d, stopFlag = %d" % (signum, stopFlag))


class myThread(threading.Thread):
    def __init__(self, threadId, name, q):
        threading.Thread.__init__(self)
        self.threadId = threadId
        self.name = name
        self.q = q

    def run(self):
        print("Starting " + self.name)
        process_data(self.name, self.q)
        print("Exiting " + self.name)


def process_data(threadName, q):
    while not exitFlag and not stopFlag:
        questionQueueLock.acquire()
        if threadName == "Thread-Q" and not questionQueue.empty():
            question = q.get()
            questionQueueLock.release()
            question_answered.wait()  # P(a)
            question_answered.clear()
            print("Thread-Q: " + question)
            question_asked.set()  # V(q)
        elif threadName == "Thread-A" and not answerQueue.empty():
            questionQueueLock.release()
            question_asked.wait()  # P(q)
            question_asked.clear()
            print("Thread-A: " + answerQueue.get())
            question_answered.set()  # V(a)
        else:
            questionQueueLock.release()


if __name__ == "__main__":
    signal.signal(signal.SIGINT, interrupt_handler)
    # 创建同步事件
    question_asked = threading.Event()
    question_answered = threading.Event()
    question_answered.set()  # 让 question_asked 先运行,破除死锁

    # 创建新线程
    threadId = 1
    for tName in threadList:
        thread = myThread(threadId, tName, questionQueue)
        thread.daemon = True
        thread.start()
        threads.append(thread)
        threadId += 1

    # 填充队列
    questionQueueLock.acquire()
    for question in questionList:
        questionQueue.put(question)
    questionQueueLock.release()

    # 主线程退出逻辑
    while True:
        # 问题队列和答案队列全部为空,会导致子线程相继退出
        if questionQueue.empty() and answerQueue.empty():
            exitFlag = True

        # 子线程全部退出 alive = False
        alive = False
        for thread in threads:
            alive = alive or thread.is_alive()
        if not alive:
            break
    print("Exiting Main Thread")
Starting Thread-Q
Starting Thread-A
Thread-Q: What is your name?
Thread-A: Sam
Thread-Q: Where are you from?
Thread-A: China
Thread-Q: How's the weather today?
Thread-A: Sunny
Thread-Q: What day is it today?
Thread-A: Friday
Thread-Q: Do you like fruits?
Thread-A: Yes
Exiting Thread-Q
Exiting Thread-A
Exiting Main Thread