进程
进程(Process)是资源分配的最小单位,也是线程的容器。
进程的状态:
- 新建
- 就绪
- 运行
- 等待
- 死亡
import time
import multiprocessing
def work1():
for i in range(10):
print(f"正在运行work1... {i}")
time.sleep(0.5)
if __name__ == '__main__':
process_obj = multiprocessing.Process(target=work1)
process_obj.start()
进程名称、ID
- 获取进程的名称: multiprocessing.current_process()
- 设置名称:
- multiprocessing.Process(target=xxxx, name="进程名称")
- 获取进程的编号
-
- multiprocessing.current_process().pid
-
- os.getpid()
-
- 获取进程的父id: os.getppid()
- 结束进程: Kill -9 进程的编号
import time
import multiprocessing
import os
def work1():
for i in range(10):
print(f"正在运行work1... ", multiprocessing.current_process())
print(f"正在运行work1... ", multiprocessing.current_process().pid)
time.sleep(0.5)
if __name__ == '__main__':
# 获取主进程名称
print(multiprocessing.current_process())
# 获取进程的编号
print(multiprocessing.current_process().pid)
process_obj = multiprocessing.Process(target=work1, name="test")
process_obj.start()
# 使用OS模块来获取进程PID
print(os.getpid())
# 获取进程的父id
print(os.getppid())
# <_MainProcess name='MainProcess' parent=None started>
# 881620
# 正在运行work1... <Process name='test' parent=881620 started>
# 正在运行work1... 871052
# 正在运行work1... <Process name='test' parent=881620 started>
# 正在运行work1... 871052
# kill 885968 结束进程
进程-参数传递、全局变量问题
- 进程的参数传递
- args 元组 # process_obj = multiprocessing.Process(target=work1, args=(10, 100,1000))
- kwargs 字典 # process_obj = multiprocessing.Process(target=work1, kwargs={"c": 1000, "a": 10, "b": 100})
- 混合args 和 kwargs # process_obj = multiprocessing.Process(target=work1, args=(10, ), kwargs={"c": 1000, "b": 100})
- 进程间共享全局变量的问题
- 进程间是不能够共享全局变量
- 底层原理:子进程会复制主进程的资源到内部运行
import time
import multiprocessing
def work1(a, b, c):
for i in range(10):
print(f"正在运行work1... {a}-{b}-{c}")
time.sleep(0.5)
if __name__ == '__main__':
# process_obj = multiprocessing.Process(target=work1, args=(10, 20, 30))
# process_obj = multiprocessing.Process(target=work1, kwargs={"a": 10, "b": 20, "c": 30})
process_obj = multiprocessing.Process(target=work1, args=(10, ), kwargs={"c": 30, "b": 20})
process_obj.start()
# 正在运行work1... 10-20-30
# 正在运行work1... 10-20-30
import multiprocessing
import time
# 定义全局变量
g_num = 10
def work1():
global g_num
for i in range(10):
g_num += 1
print(f"work1 g_num:", g_num)
def work2():
time.sleep(3)
print(f"work2 g_num:", g_num)
if __name__ == '__main__':
process_obj1 = multiprocessing.Process(target=work1)
process_obj2 = multiprocessing.Process(target=work2)
process_obj1.start()
process_obj2.start()
print(f"main g_num:", g_num)
# main g_num: 10
# work1 g_num: 20
# work2 g_num: 10
进程-守护主进程
- 进程守护: 子进程和主进程的一种约定,当主进程结束的时候,子进程也随之结束
- process_obj.daemon = True
- 结束子进程: terminate() 终止子进程的执行
- process_obj.terminate()
import time
import multiprocessing
def work1():
for i in range(10):
print(f"正在运行work1... {i}")
time.sleep(0.5)
if __name__ == '__main__':
process_obj = multiprocessing.Process(target=work1)
process_obj.daemon = True
process_obj.start()
# 终止子线程
process_obj.terminate()
# 主进程睡眠
time.sleep(2)
print("主进程即将关闭")
exit()
进程、线程对比
- 进程和线程的对比
- 进程是资源分配的基本单位,线程是CPU调度的基本单位
- 进程运行需要独立的内存资源, 线程只需要必不可少的一点运行资源
- 线程与同属一个进程的其他线程共享进程所拥有的全部资源
- 进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高程序的运行效率
- 一个程序至少有一个进程,一个进程至少有一个线程
- 进程切换慢,线程切换更快
- 线程不能独立运行,必须运行在进程中(进程能提供资源)
- CPU密集型 进程优先, I/O 密集型使用线程
- 程序更稳定进程,线程相比较不够稳定
- 不是非此即彼,而是组合使用
进程-消息队列基本操作
可以使用multiprocessing模块的Queue实现多进程之间的数据传递
- 消息队列的学习的目的:为了实现进程间的通信
- 队列的创建:
- 导入模块 multiprocessing
- multiprocessing.Queue(5) # 队列长度为5,不指定长度则没有上限
- 队列的操作:
- 放入值 queue.put(值) -->从队列尾部放入值
- 取值: queue.get() --> 从队列头部取值
- xxxx_nowait() 方式
- 放入值 put_nowait() 特点:队列未满,同put() ,但是队列已满,会报错,不等待
- 取值 get_nowait() 特点:队列未空,同get() ,但是队列已空,会报错,不等待
import multiprocessing
# 创建队列 multiprocessing.Queue(n) n表示队列长度
queue = multiprocessing.Queue(5)
# 向队列中放入值 queue.put(值)
queue.put(1)
queue.put("hello")
queue.put([1, 2, 3])
queue.put((4, 5, 6))
queue.put({"a": 10, "b": 20, "c": 30})
# queue.put("test") # 长度为5,放入第6个值后,队列就进入了阻塞状态,默认会等待队列先取出值再放入新的值
# queue.put_nowait("test") # 队列已满不再等待,会直接报错 queue.Full
print(queue.get())
print(queue.get())
print(queue.get())
print(queue.get())
print(queue.get())
# print(queue.get()) # 当队列已空后,再取值时会进入等待状态
# print(queue.get_nowait()) # 当队列已空后,queue.get_nowait() 不会等待放入新的值,直接报错
1
hello
[1, 2, 3]
(4, 5, 6)
{'a': 10, 'b': 20, 'c': 30}
进程-消息队列常见判断
- 常见的判断
- full() 判断是否已满
- empty() 判断是否为空
- 取出队列中消息的数量
- qsize()
import multiprocessing
# 创建队列 multiprocessing.Queue(n) n表示队列长度
queue = multiprocessing.Queue(5)
# 判断是否已满
print(queue.full())
# 判断是否为空
print(queue.empty())
# 向队列中放入值 queue.put(值)
queue.put(1)
queue.put("hello")
queue.put([1, 2, 3])
queue.put((4, 5, 6))
queue.put({"a": 10, "b": 20, "c": 30})
# 判断队列中消息的数量
print(queue.qsize())
# 判断是否已满
print(queue.full())
# 取出一个字
queue.get()
# 判断队列中消息的数量
print(queue.qsize())
# 判断是否已满
print(queue.full())
False
True
5
True
0
False
进程-Queue实现进程间通信
- 思路:利用队列在两个进程间进行传递,进而实现数据共享
- write_queue(queue)
- read_queue(queue)
- 创建一个空队列
- 把空队列作为参数,先把队列传递给写进程,然后把队列再传递给读进程
- join()优先让一个进程先执行完成,另外一个进程才能*
import multiprocessing
import time
# 写入数据到队列的函数
def write_queue(queue):
for i in range(10):
if queue.full():
print("队列已满")
break
queue.put(i)
print("写入", i)
time.sleep(0.5)
# 读取队列数据并显示的函数
def read_queue(queue):
while True:
if queue.qsize() == 0:
print("队列已空")
break
print(queue.get())
if __name__ == '__main__':
# 创建一个空队列
queue = multiprocessing.Queue(5)
#创建两个进程,分别写数据、读数据
write_queue_process = multiprocessing.Process(target=write_queue, args=(queue, ))
read_queue_queue_process = multiprocessing.Process(target=read_queue, args=(queue, ))
write_queue_process.start()
# 优先让写进程执行结束后再读取数据
write_queue_process.join()
read_queue_queue_process.start()
进程池
- 进程池:是一个进程的容器,可以自动帮我们创建指定数量的进程,并且管理进程及工作
- 创建方法:
- 导入模块
- 创建进程池 pool = multiprocessing.Pool(3)
- 工作方式:
- 同步方式: pool.apply(函数名, (参数1, 参数2, ....)) 进程池中的进程,一个执行完毕后另外一个才能执行,多个进程执行有先后顺序
- 异步方式: pool.apply_async(函数名, (参数1, 参数2, ....)) 进程池中的进程,多个进程同时执行,没有先后顺序
- 进程池要 close() pool.close() 表示不再接受新的任务
- 还要join() pool.join() 表示让主进程等待进程池执行结束后再退出
# 同步方式
import multiprocessing
import time
def copy_work(file):
print("正在拷贝文件。。。", multiprocessing.current_process())
time.sleep(0.5)
if __name__ == '__main__':
# 创建进程池,最大允许创建2个进程
pool = multiprocessing.Pool(2)
for i in range(10):
pool.apply(copy_work, ("file", ))
# 正在拷贝文件。。。 <SpawnProcess name='SpawnPoolWorker-1' parent=184836 started daemon>
# 正在拷贝文件。。。 <SpawnProcess name='SpawnPoolWorker-2' parent=184836 started daemon> ......
# 异步方式 需要做2点
# 1、此进程池需要close表示不再接受新的任务,
# 2、主进程不再等待进程池执行结束后再退出,需要进程池join(),让主进程等待进程池执行结束后再退出
import multiprocessing
import time
def copy_work(file):
print("正在拷贝文件。。。", multiprocessing.current_process())
time.sleep(0.5)
if __name__ == '__main__':
# 创建进程池,最大允许创建2个进程
pool = multiprocessing.Pool(2)
for i in range(10):
pool.apply_async(copy_work, ("file", ))
pool.close()
pool.join()
# 多个进程同时执行,没有先后顺序
Queue实现进程间通信
- apply_async() 返回值 ApplyResult对象,该对象由一个 wait() 的方法
- wait() 方法类似join() 表示先让当前进程执行完毕,后续进程才能启动
import multiprocessing
import time
# 写入数据到队列的函数
def write_queue(queue):
for i in range(10):
if queue.full():
print("队列已满")
break
queue.put(i)
print("写入", i)
time.sleep(0.5)
# 读取队列数据并显示的函数
def read_queue(queue):
while True:
if queue.qsize() == 0:
print("队列已空")
break
print(queue.get())
if __name__ == '__main__':
# 创建进程池
pool = multiprocessing.Pool(2)
# 创建进程池中的队列
queue = multiprocessing.Manager().Queue(5)
# 使用进程池执行任务
# 同步方式
# pool.apply(write_queue, (queue, ))
# pool.apply(read_queue, (queue, ))
# 异步方式
# apply_async() 返回值 ApplyResult对象,该对象由一个 wait() 的方法
# wait() 方法类似join() 表示先让当前进程执行完毕,后续进程才能启动
result = pool.apply_async(write_queue, (queue, ))
result.wait()
pool.apply_async(read_queue, (queue, ))
# close()表示不再接收新的任务
pool.close()
# 主进程会等待进程池执行结束后再退出
pool.join()
案例-文件夹copy器(多进程版)
"""
./test/ ---------> test2/
思路:
1、定义变量,保存源文件夹、目标文件夹所在的路径
2、在目标路径创建新的文件夹
3、获取源文件夹中的所有的文件(列表)
4、遍历列表,得到所有的文件名
5、定义函数,进行文件拷贝
文件拷贝函数:
参数: 源文件夹路径 目标文件夹路径 文件名
1、拼接源文件和目标文件的具体路径
2、打开源文件,创建目标文件
3、读取源文件的内容,写入到目标文件中 (while)
"""
import os
import multiprocessing
def copy_work(source_dir, dest_dir, file_name):
"""根据参数,拷贝文件"""
print(multiprocessing.current_process())
# 1、拼接源文件和目标文件的具体路径
# ./test/1.txt ----> /home/demo/Desktop/test/1.txt
# 2、打开源文件,创建目标文件
source_path = source_dir + "/" + file_name
dest_path = dest_dir + "/" + file_name
# print(source_path,"------>" ,dest_path)
# 3、读取源文件的内容,写入到目标文件中 (while )
with open(source_path, "rb") as source_file:
# 创建目标文件
with open(dest_path, "wb") as dest_file:
while True:
# 读源文件,保存到目标文件
file_data = source_file.read(1024)
# 判断文件是否读取完成
if file_data:
dest_file.write(file_data)
else:
break
if __name__ == '__main__':
# 1、定义变量,保存源文件夹、目标文件夹所在的路径
source_dir = "test"
dest_dir = "test2"
# 2、在目标路径创建新的文件夹
# os.mkdir() 在指定位置创建文件夹
try:
os.mkdir(dest_dir)
except Exception as e:
print("文件夹已经存在!")
# 3、获取源文件夹中的所有的文件(列表)
file_list = os.listdir(source_dir)
# 创建进程池
pool = multiprocessing.Pool(3)
# 4、遍历列表,得到所有的文件名
for file_name in file_list:
# print(file_name)
# 5、定义函数,进行文件拷贝
# copy_work(source_dir, dest_dir, file_name)
pool.apply_async(copy_work, (source_dir, dest_dir, file_name))
# close() 不再接收新的任务
pool.close()
# 让主进程等待进程池结束后在退出
pool.join()