Python多线程、多进程
知识点
多线程
备注:
- 多线程有两种应用场景,第一种是直接调用Thread()的方法,适用于实现过程不复杂的情景。第二种通过继承Thread()类,然后重写run()方法来实现,适用于实现过程复杂的情景
- 多线程是共享全局变量的
第一种方法:适用于单一方法实现功能的情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27import threading
import time
def say(message):
for i in range(5):
print("第{0}次说话,说话内容:{1}".format(i, message))
time.sleep(1)
def dance():
for i in range(5):
print("第{0}次跳舞".format(i))
time.sleep(1)
if __name__ == '__main__':
# 线程的执行是没有顺序的,由系统决定
t1 = threading.Thread(target=say, args = (message,)) # args传递的是函数运行所需要的变量值
t2 = threading.Thread(target=dance) # 此处只是创建一个子线程对象,仍然属于主线程中
t1.start() # 运行了start以后才会创建子线程,并让这个子线程运行起来
t2.start()
while True:
print(threading.enumerate()) # 利用enumerate方法可以直接打印出列表、元组等元素的元组格式数据
if len(threading.enumerate()) <= 1:
break第二种方法:适用于多方法实现功能的情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23import threading
import time
class Thread_Test(threading.Thread):
# 通过继承类,则一定要重写run方法才可以
def run(self):
for i in range(5):
self.talk() # 该处的talk和speak方法会同步执行
self.speak()
time.sleep(1)
def talk(self):
print("i talk le ")
def speak(self):
print("i speak le ")
if __name__ == '__main__':
test = Thread_Test()
# 多线程本质上是调用Thread类中的run方法,所以此处使用start方法
test.start()多线程问题1:造成资源竞争(对同一个数据进行写入,导致数据不准确),
解决方案:当某个线程需要更改共享数据时,先将共享的资源进行锁定,此时其它线程不能更改,直到该线程释放资源,将资源的状态变为非锁定,其它线程才可以再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的安全性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27import threading
import time
NUM = 100
mutex = threading.Lock() # 创建互斥锁,初始未锁定状态
def say(message):
globe NUM # 不可变类型的全局变量使用前需要先声明
for i in range(message):
# 锁定数据的要求:锁定的代码越少越好,
mutex.acquire() # 开始锁定数据
NUM += i
mutex.release() # 释放互斥锁
def dance(message)
globe NUM
for i in range(message):
mutex.acquire() # 开始锁定数据
NUM += i
mutex.release() # 释放互斥锁
if __name__ == '__main__':
# 线程的执行是没有顺序的,由系统决定
t1 = threading.Thread(target=say, args = (message,)) # args传递的是函数运行所需要的变量值
t2 = threading.Thread(target=dance) # 此处只是创建一个子线程对象,仍然属于主线程中
t1.start() # 运行了start以后才会创建子线程,并让这个子线程运行起来
t2.start()多线程问题2:死锁(线程之间共享多个资源,若两个线程之间分别占有一部分资源并且同时等待对方的资源,就会造成死锁)
解决方案1:优化设计算法(银行家算法)
解决方案2:添加超时时间
多进程
备注
- 可执行文件未运行前叫程序,运行起来以后叫进程。程序是没有资源的(内存、硬件调用),进程是有资源的(内存、软硬件等),进程是程序运行以后代码与所占用资源的综合
- 创建子进程以后,主进程和子进程的代码、资源都是独立且完全一样的
通过Process()类创建子进程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26import multiprocessing
from time import sleep
def test1(num):
for i in range(100):
print("test1: {0}, talk: {1}".format(i, num))
sleep(1)
def test2():
for i in range(100):
print("test2: {0}".format(i))
sleep(1)
def main():
# 多进程的实现原理与多线程类似,只是多进程之间的资源都是独立的
p1 = multiprocessing.Process(target=test1, args=(9,))
p2 = multiprocessing.Process(target=test2)
p1.start()
p2.start()
if __name__ == '__main__':
main()多进程问题:资源不共享导致多进程之间无法进行通信
解决方案1:通过本地存储的文件进行交互
解决方案2:通过multiprocess中Queue()类(队列)实现数据的交互。queue实现原理是在内存中开辟一部分空间,将需要交互的数据存储在内存中,当其它进程需要交互数据时就从内存中queue开辟的空间中读取,谁先存入,在读取时就优先读取出来。queue的存在使得多进程中耦合度降低(解耦,耦合度越高,程序越不好,反之更好)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38import multiprocessing
from time import sleep
def put_data(queue_list):
list_data = [1, 2, 3] * 50
for num in list_data:
queue_list.put(num) # 向队列中追加元素,当队列元素满载时,此时会堵塞在此,直到队列中可以给put
print("put_data: 上传元素:{0}".format(num))
def get_data(queue_list):
result = 0
while True:
# empty(): 判断队列中是否是空队列,空队列返回True
# full():判断队列是否已经满了,满队列返回True
if not queue_list.empty():
sleep(1)
num = queue_list.get() # 从队列提取元素,当队列元素为空时,此时会堵塞在此,直到队列中可以提取到元素
# num = queue_list.get_nowait() # 即时提取队列元素,不等待,若提取不到元素则报错
print("get-data: 上传元素:{0}".format(num))
result += num
else:
break
print("所有元素的和为{0}".format(result))
def main():
# 主线程中创建队列,然后将队列作为参数传入到子进程中,若不填队列数,则系统会根据内存自动分配
queue_list = multiprocessing.Queue(3) # 3表示队列中元素最多为3个,满载时无法put,队列为空时无法get
p1 = multiprocessing.Process(target=put_data, args=(queue_list,))
p2 = multiprocessing.Process(target=get_data, args=(queue_list,))
p1.start()
p2.start()
if __name__ == '__main__':
main()
进程池
背景
- 当需要创建的子进程数量不多时,可以直接利用multiprocessing中Process动态创建,但如果是上百个甚至是上千个目标(进程创建和销毁需要较多的工作量和资源),此时就需要用multiprocessing模块中的Pool方法
实现方式
初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool时,如果池还没满,那么就会创建一个新的进程用于执行该请求;但如果进程池的进程数已经达到指定的最大数,那么该请求就会等待,直到进程池有进程结束,才会用之前的进程来执行新的任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27import multiprocessing
import time
import os
import random
def run_back(msg):
start_time = time.time()
# os.getpid() 可以获取当前程序的进程号
print("执行进程为{0}, 进程号为{1}".format(msg, os.getpid()))
time.sleep(random.randint(1, 3))
end_time = time.time()
print("执行完毕,耗时:{0}".format(start_time - end_time))
def main():
# 此处定义一个进程池,和定义队列类似,数字3表示允许的进程池的最大数量为3
po = multiprocessing.Pool(3)
for i in range(100):
# Pool().apply_async(调用的目标, (传递给目标的参数元组,))
# 每次循环将会用空闲出来的进程(不会终止)去调用目标
po.apply_async(run_back, (i,)) # 此处开始创建进程池中的进程,并自动调度进程池的进程
print("----------start---------")
po.close() # 关闭进程池,关闭后po不再接受新的请求
po.join() # 等待po中所有子进程执行完成,必须放在close语句之后
print("----------end-----------")
if __name__ == '__main__':
main()注意点
- 通过进程池创建的子进程和主进程彼此独立开,当创建子进程的语句全部执行完毕后,主进程依旧继续从上到下执行,不会等待子进程执行完·毕。如果主进程比子进程先执行完毕,那么程序照样结束,导致逻辑不对,所以必须加入Pool().join(),用于堵塞主线程,等待子线程全部执行完毕
- 进程池中的子进程异常的时候,是不会将报错信息打印出来的
创建文件复制器
1 | import os |
作业
打印出当前线程中所有同时在线的线程信息
利用继承多线程类的方式,实现以下两个方法的多线程同步
1
2
3
4
5def talk(self):
print("i talk le ")
def speak(self):
print("i speak le ")(简答)线程是共享全局资源的,那么如何利用互斥锁保障多线程中的资源竞争现象
利用多进程和queue队列实现进程之间的数据交互,需求如下:
- 向一个列表中追加数据的同时,也自动的从列表中读取数据,并打印出来
(问答)请简述利用进程池实现多进程的步骤
(问答)使用进程池时的注意事项
(问答)当多进程出现异常时,是否会打印出错误信息