7.进程

进程

进程(Process)是资源分配的最小单位,也是线程的容器。

进程的状态:

  • 新建
  • 就绪
  • 运行
  • 等待
  • 死亡
import time
import multiprocessing

def work1():
    for i in range(10):
        print(f"正在运行work1... {i}")
        time.sleep(0.5)

if __name__ == '__main__':
    process_obj = multiprocessing.Process(target=work1)
    process_obj.start()

进程名称、ID

  • 获取进程的名称: multiprocessing.current_process()
  • 设置名称:
    • multiprocessing.Process(target=xxxx, name="进程名称")
  • 获取进程的编号
      1. multiprocessing.current_process().pid
      1. os.getpid()
  • 获取进程的父id: os.getppid()
  • 结束进程: Kill -9 进程的编号
import time
import multiprocessing
import os

def work1():
    for i in range(10):
        print(f"正在运行work1... ", multiprocessing.current_process())
        print(f"正在运行work1... ", multiprocessing.current_process().pid)
        time.sleep(0.5)

if __name__ == '__main__':
    # 获取主进程名称
    print(multiprocessing.current_process())
    # 获取进程的编号
    print(multiprocessing.current_process().pid)
    process_obj = multiprocessing.Process(target=work1, name="test")
    process_obj.start()

    # 使用OS模块来获取进程PID
    print(os.getpid())
    # 获取进程的父id
    print(os.getppid())

# <_MainProcess name='MainProcess' parent=None started>
# 881620
# 正在运行work1...  <Process name='test' parent=881620 started>
# 正在运行work1...  871052
# 正在运行work1...  <Process name='test' parent=881620 started>
# 正在运行work1...  871052

#  kill 885968    结束进程

进程-参数传递、全局变量问题

  • 进程的参数传递
    • args 元组 # process_obj = multiprocessing.Process(target=work1, args=(10, 100,1000))
    • kwargs 字典 # process_obj = multiprocessing.Process(target=work1, kwargs={"c": 1000, "a": 10, "b": 100})
    • 混合args 和 kwargs # process_obj = multiprocessing.Process(target=work1, args=(10, ), kwargs={"c": 1000, "b": 100})
  • 进程间共享全局变量的问题
    • 进程间是不能够共享全局变量
    • 底层原理:子进程会复制主进程的资源到内部运行
import time
import multiprocessing

def work1(a, b, c):
    for i in range(10):
        print(f"正在运行work1... {a}-{b}-{c}")
        time.sleep(0.5)

if __name__ == '__main__':
    # process_obj = multiprocessing.Process(target=work1, args=(10, 20, 30))
    # process_obj = multiprocessing.Process(target=work1, kwargs={"a": 10, "b": 20, "c": 30})
    process_obj = multiprocessing.Process(target=work1, args=(10, ), kwargs={"c": 30, "b": 20})
    process_obj.start()

# 正在运行work1... 10-20-30
# 正在运行work1... 10-20-30
import multiprocessing
import time

# 定义全局变量
g_num = 10

def work1():
    global g_num
    for i in range(10):
        g_num += 1
    print(f"work1 g_num:", g_num)

def work2():
    time.sleep(3)
    print(f"work2 g_num:", g_num)

if __name__ == '__main__':
    process_obj1 = multiprocessing.Process(target=work1)
    process_obj2 = multiprocessing.Process(target=work2)
    process_obj1.start()
    process_obj2.start()
    print(f"main g_num:", g_num)

# main g_num: 10
# work1 g_num: 20
# work2 g_num: 10

进程-守护主进程

  • 进程守护: 子进程和主进程的一种约定,当主进程结束的时候,子进程也随之结束
    • process_obj.daemon = True
  • 结束子进程: terminate() 终止子进程的执行
    • process_obj.terminate()
import time
import multiprocessing

def work1():
    for i in range(10):
        print(f"正在运行work1... {i}")
        time.sleep(0.5)

if __name__ == '__main__':
    process_obj = multiprocessing.Process(target=work1)
    process_obj.daemon = True
    process_obj.start()
    # 终止子线程
    process_obj.terminate()

    # 主进程睡眠
    time.sleep(2)
    print("主进程即将关闭")
    exit()

进程、线程对比

  • 进程和线程的对比
    • 进程是资源分配的基本单位,线程是CPU调度的基本单位
    • 进程运行需要独立的内存资源, 线程只需要必不可少的一点运行资源
    • 线程与同属一个进程的其他线程共享进程所拥有的全部资源
    • 进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高程序的运行效率
    • 一个程序至少有一个进程,一个进程至少有一个线程
    • 进程切换慢,线程切换更快
    • 线程不能独立运行,必须运行在进程中(进程能提供资源)
    • CPU密集型 进程优先, I/O 密集型使用线程
    • 程序更稳定进程,线程相比较不够稳定
  • 不是非此即彼,而是组合使用

进程-消息队列基本操作

可以使用multiprocessing模块的Queue实现多进程之间的数据传递

  • 消息队列的学习的目的:为了实现进程间的通信
  • 队列的创建:
    • 导入模块 multiprocessing
    • multiprocessing.Queue(5) # 队列长度为5,不指定长度则没有上限
  • 队列的操作:
    • 放入值 queue.put(值) -->从队列尾部放入值
    • 取值: queue.get() --> 从队列头部取值
    • xxxx_nowait() 方式
    • 放入值 put_nowait() 特点:队列未满,同put() ,但是队列已满,会报错,不等待
    • 取值 get_nowait() 特点:队列未空,同get() ,但是队列已空,会报错,不等待
import multiprocessing

# 创建队列 multiprocessing.Queue(n)  n表示队列长度
queue = multiprocessing.Queue(5)

# 向队列中放入值 queue.put(值)
queue.put(1)
queue.put("hello")
queue.put([1, 2, 3])
queue.put((4, 5, 6))
queue.put({"a": 10, "b": 20, "c": 30})
# queue.put("test")   # 长度为5,放入第6个值后,队列就进入了阻塞状态,默认会等待队列先取出值再放入新的值
# queue.put_nowait("test")  # 队列已满不再等待,会直接报错 queue.Full
print(queue.get())
print(queue.get())
print(queue.get())
print(queue.get())
print(queue.get())
# print(queue.get())   # 当队列已空后,再取值时会进入等待状态
# print(queue.get_nowait())   # 当队列已空后,queue.get_nowait() 不会等待放入新的值,直接报错
1
hello
[1, 2, 3]
(4, 5, 6)
{'a': 10, 'b': 20, 'c': 30}

进程-消息队列常见判断

  • 常见的判断
    • full() 判断是否已满
    • empty() 判断是否为空
  • 取出队列中消息的数量
    • qsize()
import multiprocessing

# 创建队列 multiprocessing.Queue(n)  n表示队列长度
queue = multiprocessing.Queue(5)

# 判断是否已满
print(queue.full())

# 判断是否为空
print(queue.empty())

# 向队列中放入值 queue.put(值)
queue.put(1)
queue.put("hello")
queue.put([1, 2, 3])
queue.put((4, 5, 6))
queue.put({"a": 10, "b": 20, "c": 30})

# 判断队列中消息的数量
print(queue.qsize())

# 判断是否已满
print(queue.full())

# 取出一个字
queue.get()

# 判断队列中消息的数量
print(queue.qsize())
# 判断是否已满
print(queue.full())
False
True
5
True
0
False

进程-Queue实现进程间通信

  • 思路:利用队列在两个进程间进行传递,进而实现数据共享
    • write_queue(queue)
    • read_queue(queue)
    • 创建一个空队列
    • 把空队列作为参数,先把队列传递给写进程,然后把队列再传递给读进程
  • join()优先让一个进程先执行完成,另外一个进程才能*
import multiprocessing
import time

# 写入数据到队列的函数
def write_queue(queue):
    for i in range(10):
        if queue.full():
            print("队列已满")
            break
        queue.put(i)
        print("写入", i)
        time.sleep(0.5)

# 读取队列数据并显示的函数
def read_queue(queue):
    while True:
        if queue.qsize() == 0:
            print("队列已空")
            break
        print(queue.get())

if __name__ == '__main__':
    # 创建一个空队列
    queue = multiprocessing.Queue(5)
    #创建两个进程,分别写数据、读数据
    write_queue_process = multiprocessing.Process(target=write_queue, args=(queue, ))
    read_queue_queue_process = multiprocessing.Process(target=read_queue, args=(queue, ))
    write_queue_process.start()
    # 优先让写进程执行结束后再读取数据
    write_queue_process.join()
    read_queue_queue_process.start()

进程池

  • 进程池:是一个进程的容器,可以自动帮我们创建指定数量的进程,并且管理进程及工作
    • 创建方法:
    • 导入模块
    • 创建进程池 pool = multiprocessing.Pool(3)
    • 工作方式:
    • 同步方式: pool.apply(函数名, (参数1, 参数2, ....)) 进程池中的进程,一个执行完毕后另外一个才能执行,多个进程执行有先后顺序
    • 异步方式: pool.apply_async(函数名, (参数1, 参数2, ....)) 进程池中的进程,多个进程同时执行,没有先后顺序
      • 进程池要 close() pool.close() 表示不再接受新的任务
      • 还要join() pool.join() 表示让主进程等待进程池执行结束后再退出
# 同步方式
import multiprocessing
import time

def copy_work(file):
    print("正在拷贝文件。。。", multiprocessing.current_process())
    time.sleep(0.5)

if __name__ == '__main__':
    # 创建进程池,最大允许创建2个进程
    pool = multiprocessing.Pool(2)
    for i in range(10):
        pool.apply(copy_work, ("file", ))

# 正在拷贝文件。。。 <SpawnProcess name='SpawnPoolWorker-1' parent=184836 started daemon>
# 正在拷贝文件。。。 <SpawnProcess name='SpawnPoolWorker-2' parent=184836 started daemon> ......
# 异步方式 需要做2点
# 1、此进程池需要close表示不再接受新的任务,
# 2、主进程不再等待进程池执行结束后再退出,需要进程池join(),让主进程等待进程池执行结束后再退出
import multiprocessing
import time

def copy_work(file):
    print("正在拷贝文件。。。", multiprocessing.current_process())
    time.sleep(0.5)

if __name__ == '__main__':
    # 创建进程池,最大允许创建2个进程
    pool = multiprocessing.Pool(2)
    for i in range(10):
        pool.apply_async(copy_work, ("file", ))
    pool.close()
    pool.join()

# 多个进程同时执行,没有先后顺序

Queue实现进程间通信

  • apply_async() 返回值 ApplyResult对象,该对象由一个 wait() 的方法
  • wait() 方法类似join() 表示先让当前进程执行完毕,后续进程才能启动
import multiprocessing
import time

# 写入数据到队列的函数
def write_queue(queue):
    for i in range(10):
        if queue.full():
            print("队列已满")
            break
        queue.put(i)
        print("写入", i)
        time.sleep(0.5)

# 读取队列数据并显示的函数
def read_queue(queue):
    while True:
        if queue.qsize() == 0:
            print("队列已空")
            break
        print(queue.get())

if __name__ == '__main__':
    # 创建进程池
    pool = multiprocessing.Pool(2)
    # 创建进程池中的队列
    queue = multiprocessing.Manager().Queue(5)
    # 使用进程池执行任务
    # 同步方式
    # pool.apply(write_queue, (queue, ))
    # pool.apply(read_queue, (queue, ))

    # 异步方式
    # apply_async() 返回值 ApplyResult对象,该对象由一个 wait() 的方法
    # wait() 方法类似join() 表示先让当前进程执行完毕,后续进程才能启动
    result = pool.apply_async(write_queue, (queue, ))
    result.wait()
    pool.apply_async(read_queue, (queue, ))
    # close()表示不再接收新的任务
    pool.close()
    # 主进程会等待进程池执行结束后再退出
    pool.join()

案例-文件夹copy器(多进程版)

"""
./test/ ---------> test2/
思路:
1、定义变量,保存源文件夹、目标文件夹所在的路径
2、在目标路径创建新的文件夹
3、获取源文件夹中的所有的文件(列表)
4、遍历列表,得到所有的文件名
5、定义函数,进行文件拷贝

文件拷贝函数:
参数: 源文件夹路径 目标文件夹路径 文件名
1、拼接源文件和目标文件的具体路径
2、打开源文件,创建目标文件
3、读取源文件的内容,写入到目标文件中 (while)
"""
import os
import multiprocessing

def copy_work(source_dir, dest_dir, file_name):
    """根据参数,拷贝文件"""
    print(multiprocessing.current_process())

    # 1、拼接源文件和目标文件的具体路径
    # ./test/1.txt    ----> /home/demo/Desktop/test/1.txt
    # 2、打开源文件,创建目标文件
    source_path = source_dir + "/" + file_name
    dest_path = dest_dir + "/" + file_name
    # print(source_path,"------>" ,dest_path)

    # 3、读取源文件的内容,写入到目标文件中 (while )
    with open(source_path, "rb") as source_file:
        # 创建目标文件
        with open(dest_path, "wb") as dest_file:
            while True:
                # 读源文件,保存到目标文件
                file_data = source_file.read(1024)
                # 判断文件是否读取完成
                if file_data:
                    dest_file.write(file_data)
                else:
                    break

if __name__ == '__main__':
    # 1、定义变量,保存源文件夹、目标文件夹所在的路径
    source_dir = "test"
    dest_dir = "test2"

    # 2、在目标路径创建新的文件夹
    # os.mkdir()  在指定位置创建文件夹
    try:
        os.mkdir(dest_dir)
    except Exception as e:
        print("文件夹已经存在!")

    # 3、获取源文件夹中的所有的文件(列表)
    file_list = os.listdir(source_dir)

    # 创建进程池
    pool = multiprocessing.Pool(3)

    # 4、遍历列表,得到所有的文件名
    for file_name in file_list:
        # print(file_name)
        # 5、定义函数,进行文件拷贝
        # copy_work(source_dir, dest_dir, file_name)
        pool.apply_async(copy_work, (source_dir, dest_dir, file_name))

    # close() 不再接收新的任务
    pool.close()
    # 让主进程等待进程池结束后在退出
    pool.join()