多线程#
本文用一个例子来引入多线程,稍微有一些难度,需要花一些时间理解。难点在于如何使用 CTRL-C 终止多线程:这是因为多线程运行时,CTRL-C 只能终止主线程,而不能终止子线程。另外需要注意的是,如果你用到了 proc = subprocess.Popen()
函数,那么 CTRL-C 发出的 SIGINT
信号同样无法被 proc
接收到,你需要单独用 proc.terminate()
函数来终止由 subprocess
创建的进程。
问题描述#
目标:使用 Python 实现一个简单的问答模型,它的运行逻辑遵循下面的规则:
使用两个线程
Thread-Q
(提问题的线程) 和Thread-A
(回答问题的线程)。Thread-A
阻塞等待Thread-Q
提出问题。Thread-Q
提出问题,阻塞等待Thread-A
回答。Thread-A
回答Thread-Q
的问题,然后阻塞等待Thread-Q
继续提问。Thread-Q
打印Thread-A
的回答,然后提出新的问题,阻塞等待Thread-A
回答。重复步骤 4 和步骤 5。
具体实现#
import queue
import signal
import threading
exitFlag = False
stopFlag = False
threadList = ["Thread-Q", "Thread-A"]
threads = []
questionQueueLock = threading.Lock()
questionQueue = queue.Queue(10)
answerQueue = queue.Queue(10)
questionList = [
"What is your name?",
"Where are you from?",
"How's the weather today?",
"What day is it today?",
"Do you like fruits?",
]
answerList = ["Sam", "China", "Sunny", "Friday", "Yes"]
for answer in answerList:
answerQueue.put(answer)
# 多线程的 CTRL-C 事件需要单独处理,才能让多线程正常退出
def interrupt_handler(signum, frame):
global stopFlag
stopFlag = True # stop child thread
print("receive a signal %d, stopFlag = %d" % (signum, stopFlag))
class myThread(threading.Thread):
def __init__(self, threadId, name, q):
threading.Thread.__init__(self)
self.threadId = threadId
self.name = name
self.q = q
def run(self):
print("Starting " + self.name)
process_data(self.name, self.q)
print("Exiting " + self.name)
def process_data(threadName, q):
while not exitFlag and not stopFlag:
questionQueueLock.acquire()
if threadName == "Thread-Q" and not questionQueue.empty():
question = q.get()
questionQueueLock.release()
question_answered.wait() # P(a)
question_answered.clear()
print("Thread-Q: " + question)
question_asked.set() # V(q)
elif threadName == "Thread-A" and not answerQueue.empty():
questionQueueLock.release()
question_asked.wait() # P(q)
question_asked.clear()
print("Thread-A: " + answerQueue.get())
question_answered.set() # V(a)
else:
questionQueueLock.release()
if __name__ == "__main__":
signal.signal(signal.SIGINT, interrupt_handler)
# 创建同步事件
question_asked = threading.Event()
question_answered = threading.Event()
question_answered.set() # 让 question_asked 先运行,破除死锁
# 创建新线程
threadId = 1
for tName in threadList:
thread = myThread(threadId, tName, questionQueue)
thread.daemon = True
thread.start()
threads.append(thread)
threadId += 1
# 填充队列
questionQueueLock.acquire()
for question in questionList:
questionQueue.put(question)
questionQueueLock.release()
# 主线程退出逻辑
while True:
# 问题队列和答案队列全部为空,会导致子线程相继退出
if questionQueue.empty() and answerQueue.empty():
exitFlag = True
# 子线程全部退出 alive = False
alive = False
for thread in threads:
alive = alive or thread.is_alive()
if not alive:
break
print("Exiting Main Thread")
Starting Thread-Q
Starting Thread-A
Thread-Q: What is your name?
Thread-A: Sam
Thread-Q: Where are you from?
Thread-A: China
Thread-Q: How's the weather today?
Thread-A: Sunny
Thread-Q: What day is it today?
Thread-A: Friday
Thread-Q: Do you like fruits?
Thread-A: Yes
Exiting Thread-Q
Exiting Thread-A
Exiting Main Thread