6.多任务、多线程

多任务、多线程

多任务

  • 多任务: 同一时间有多个任务在执行,这就是多任务
  • python程序默认是单任务
import time
# 唱歌的函数
def sing():
      for i in range(3):
        print("正在唱歌。。。")
        time.sleep(0.5)
# 跳舞的函数
def dangce():
      for i in range(3):
        print("正在跳舞。。。")
        time.sleep(0.5)
# 调用
if __name__ == '__main__':
        sing()
        dangce()
正在唱歌。。。
正在唱歌。。。
正在唱歌。。。
正在跳舞。。。
正在跳舞。。。
正在跳舞。。。

线程

  • 线程是CPU调度的基本单元
  • 主线程:程序启动后有一个默认的主线,通常称为主线程
    • 作用:1)创建子线程 2)等其他子线程执行结束后,做关闭操作
  • 子线程:程序的一个分支
  • 子线程创建
    • 导入模块 threading
    • 创建线程对象 threading.Thread(target=执行的分支函数名) # 注意,函数名没有()
    • 启动子线程 线程对象.start*
import time, threading
# 唱歌的函数
def sing():
    for i in range(3):
        print("正在唱歌。。。")
        time.sleep(0.5)
# 跳舞的函数
def dangce():
    for i in range(3):
        print("正在跳舞。。。")
        time.sleep(0.5)
# 调用
if __name__ == '__main__':
    sing_thread = threading.Thread(target=sing)
    dangce_thread = threading.Thread(target=dangce)
    sing_thread.start()
    dangce_thread.start()
正在唱歌。。。
正在跳舞。。。

正在跳舞。。。正在唱歌。。。

正在唱歌。。。正在跳舞。。。

线程数量

  • 获取线程的数量:正在活跃的线程数量
    • threading.enumerate() --- 当前活跃的线程对象列表
    • 长度: len(threading.enumerate())
  • 获取线程的名称
    • threading.current_thread() 获取当前的线程对象,对象中含有名称
import time, threading
# 唱歌的函数
def sing():
    for i in range(3):
        print(f"正在唱歌。。。{threading.current_thread()}")
        time.sleep(0.5)
# 跳舞的函数
def dangce():
    for i in range(3):
        print(f"正在跳舞。。。{threading.current_thread()}")
        time.sleep(0.5)
# 调用
if __name__ == '__main__':
    print(len(threading.enumerate()))
    sing_thread = threading.Thread(target=sing)
    dangce_thread = threading.Thread(target=dangce)
    sing_thread.start()
    dangce_thread.start()
    print(threading.current_thread())
    print(len(threading.enumerate()))
    print(threading.enumerate())
6
正在唱歌。。。
正在跳舞。。。
<_MainThread(MainThread, started 387132)>
8
[<_MainThread(MainThread, started 387132)>, , , , , , , ]

正在唱歌。。。正在跳舞。。。

正在跳舞。。。正在唱歌。。。

线程的参数及顺序

  • 线程参数
    • 元组 args
    • threading.Tread(target=xxx, args=(参数1, 参数2, ....))
    • 元组中元素的顺序和函数的参数顺序一致
    • 字典 kwargs
    • 使用字典传递 threading.Thread(target=xxx, kwargs={"参数名": 参数值, ...})
    • thread_sing = threading.Thread(kwargs={"a": 10, "c": 1000, "b": 100})
    • 混合元组和字典
    • 混合使用元组和字典 threading.Thread(target=xxx, args=(参数1,参数2,...), kwargs={"参数名": 参数值,...})
    • thread_sing = threading.Thread(target=sing, args=(10, ), kwargs={"c": 1000, "b": 100})
  • 线程顺序: 线程的执行顺序是无序的
    • 线程有CPU调度执行,CPU会根据系统运行状态按照自己的调度算法去调度执行
import time, threading
# 唱歌的函数
def sing(a, b, c):
    for i in range(3):
        print(f"正在唱歌。{a}。{b}。{c},当前线程:{threading.current_thread()}")
        time.sleep(0.5)
# 跳舞的函数
def dangce(a, b, c):
    for i in range(3):
        print(f"正在跳舞。{a}。{b}。{c},当前线程:{threading.current_thread()}")
        time.sleep(0.5)
# 调用
if __name__ == '__main__':
    sing_thread = threading.Thread(target=sing, args=(1, 2, 3))
    dangce_thread = threading.Thread(target=dangce, kwargs={"a": 4, "b": 5, "c": 6})
    sing_thread.start()
    dangce_thread.start()
正在唱歌。1。2。3,当前线程:正在跳舞。4。5。6,当前线程:

正在跳舞。4。5。6,当前线程:正在唱歌。1。2。3,当前线程:

正在唱歌。1。2。3,当前线程:正在跳舞。4。5。6,当前线程:

线程-守护线程

守护线程:子线程和主线程的一种约定(当主线程结束后,子线程也结束)

  • 子线程.setDaemon(True)
  • 设置需要在start()之前
import time, threading

def work():
    for i in range(10):
        print(f"正在执行work{i}")
        time.sleep(0.5)

if __name__ == '__main__':
    work_thread = threading.Thread(target=work)
    work_thread.start()
    # 让主线退出
    time.sleep(2)
    exit()
# 主线程退出后子线程仍然执行完函数
import time, threading

def work():
    for i in range(10):
        print(f"正在执行work{i}")
        time.sleep(0.5)

if __name__ == '__main__':
    work_thread = threading.Thread(target=work)
    # work_thread.setDaemon(True) python3.12弃用不推荐
    work_thread.daemon=True
    work_thread.start()
    # 让主线退出
    time.sleep(2)
    exit()
# 主线程退出后子线程立刻停止执行

并行和并发

  • 并发: 任务数大于CPU的核心数,多个任务并发执行
  • 并行: 任务数小于CPU核心数,多核处理器同一时刻,多个任务同时执行

自定义线程类

  • 通过继承threading.Thread可以实现自自定义线程
  • 自定义线程类的步骤

    • 导入模块
    • 创建类并且继承 threading.Thread
    • class MyThread(threading.Thread):
  • 重写父类的run方法

    • def run(self):
  • 创建对象并且调用.start()

    • mythread = MyThread()
    • mythread.start()
  • 底层原理

    • Thread类:
    • run方法
    • start()
    • start()中调用了run方法
  • 自定义线程类的init方法问题

    • 子类先通过super调用父类的初始化方法,子类再初始化
def __init__(self,num):
        # 先去调用父类的init方法
        super().__init__()
        self.num = num
import threading, time

# 自定义线程类
class MyThread(threading.Thread):

    def __init__(self, num):
        # 先去调用父类的init方法
        super().__init__()
        self.num = num

    # 重写父类的run方法
    def run(self):
        for i in range(3):
            # self.name 从父类继承的一个属性
            print("正在执行子线程的run方法。。。", i, self.name, self.num)
            time.sleep(0.5)

if __name__ == '__main__':
    print("主线程执行中。。。")
    mythread = MyThread(10)
    mythread.start()
主线程执行中。。。
正在执行子线程的run方法。。。 0 Thread-14 10

正在执行子线程的run方法。。。 1 Thread-14 10
正在执行子线程的run方法。。。 2 Thread-14 10

多线程-共享全局变量

  • 多线程之间可以共享全局变量
  • 存在的问题:多个线程同时访问同一个资源,出现资源竞争的问题
  • 解决方法:优先让某个线程先执行
线程对象.join()

缺点:把多线程变成了单线程,影响整体性能

import threading
import time

# 定义全局变量
num = 0

def work1():
    # 声明全局变量
    global num
    for i in range(1000000):
        num += 1
    print(f"work1----{num}")

def work2():
    global num
    for i in range(1000000):
        num += 1
    print(f"work2----{num}")

if __name__ == '__main__':
    t1 = threading.Thread(target=work1)
    t2 = threading.Thread(target=work2)

    t1.start()
    # 优先让t1 先执行完成
    t1.join()

    t2.start()

    # print(len(threading.enumerate()))  # 6
    # 确保t1 t2线程执行过多毕后打印num值
    while len(threading.enumerate()) != 6:
        time.sleep(1)
    print(f"main----{num}")

# 存在的问题:多个线程同时访问同一个资源,出现资源竞争的问题
# work1----1363814
# work2----2000000
# main----2000000

# t1.join()  让t1优先执行
# work1----1000000
# work2----2000000
# main----2000000
work1----1000000
work2----2000000
main----2000000

同步、异步

  • 同步:在多任务中 多个任务执行有先后顺序,一个执行完毕后,另外一个再执行
  • 异步:在多任务中多个任务执行没有先后顺序,多个任务同时执行
  • 线程的锁机制:当线程获取资源后,立刻进行锁定,资源使用完毕后再解锁,有效的保证同一时间只有一个线程在使用资源
互斥锁
  • 互斥锁使用分三步:
    • 创建一把锁:
      lock1 = threading.Lock()
    • 上锁:
      lock1.acquire()
    • 解锁:
      lock1.release()
  • 互斥锁的使用原则:尽可能少的锁定竞争资源
死锁
  • 死锁:在多线程中,两个线程都占用一些资源 ,而且同一时间都在等待对方释放资源,这种状态就是死锁状态
  • 避免:锁使用完毕后,要及时释放
# 互斥锁
import threading

num = 0
def work1():
    global num
    lock1.acquire()
    for i in range(1000000):
        num += 1
    lock1.release()
    print(f"work1----{num}")

def work2():
    global num
    lock1.acquire()
    for i in range(1000000):
        num += 1
    lock1.release()
    print(f"work2----{num}")

if __name__ == '__main__':
    # 创建一把互拆锁
    lock1 = threading.Lock()
    t1 = threading.Thread(target=work1)
    t2 = threading.Thread(target=work2)
    t1.start()
    t2.start()

    # print(len(threading.enumerate()))  # 8
    # 确保t1 t2线程执行过多毕后打印num值
    while len(threading.enumerate()) != 8:
        time.sleep(1)
    print(f"main----{num}")
work1----1000000
work2----2000000
main----2000000
# 死锁
import threading

# 定义函数,根据下标获取列表元素值
def get_value(index):
    data_list = [1, 2, 3, 4, 5, 6]
    lock.acquire()
    # 判断下标位置是否正确
    if index >= len(data_list):
        print("下标越界!", index)
        lock.release()  # 程序提前退出,锁未释放,会导致后线线程等待解锁,形成死锁
        return
    print(data_list[index])
    lock.release()

# 创建10个线程,观察资源的等待状态
if __name__ == '__main__':
    # 创建锁
    lock = threading.Lock()
    for i in range(10):
        t = threading.Thread(target=get_value, args=(i, ))
        t.start()
1
2
3
4
5
6
下标越界! 6
下标越界! 7
下标越界! 8
下标越界! 9

多任务版UDP聊天器

  • 创建一个子线程,单独用来接收信息
  • 删除 接收信息的菜单和判断
  • 可以接收多条信息
  • 让子线程守护主线程
import socket
import threading

def send_message(udp_socket):
    """返送信息的函数"""
    # 1) 定义变量接收用户与输入的接收方的IP地址
    ipaddr = input("请输入接收方的IP地址:\n")
    # 判断是否需要默认
    if len(ipaddr) == 0:
        ipaddr = "168.12.1.50"
        print("当前接收方默认IP设置为[%s]" % ipaddr)

    # 2)定义变量接收用户与输入的接收方的端口号
    port = input("请输入接收方的端口号:\n")
    if len(port) == 0:
        port = "8888"
        print("当前接收方默认端口设置为[%s]" % port)

    # 3)定义变量接收用户与输入的接收方的内容
    content = input("请输入要发送的内容:\n")

    # 4)使用socket的sendto()发送信息
    udp_socket.sendto(content.encode(), (ipaddr, int(port)))

def recv_msg(udp_socket):
    """接收信息的函数"""
    while True:
        # 1) 使用socket接收数据
        recv_data, ip_port = udp_socket.recvfrom(1024)
        # 2)解码数据
        recv_txt = recv_data.decode()
        # 3)输出显示
        print('收到', ip_port, '发送的消息:', recv_txt)

def main():
    """程序主入口"""
    # 1)创建套接字
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    # 2)绑定端口
    udp_socket.bind(("", 6666))

    # 创建子线程,单独接收用户发送的信息
    thread_recvmsg = threading.Thread(target=recv_msg, args=(udp_socket, ))
    # 设置子线程守护主线程
    thread_recvmsg.daemon=True
    thread_recvmsg.start()

    # 3)打印菜单(循环)
    while True:
        print('\n\n*****************************')
        print('*******   1、发送信息   *******')
        print('*******   2、退出系统   *******')
        print('*****************************')

        # 4)接收用户输入的选项
        sel_num = int(input('请输入选项:\n'))

        # 5)判断用户的选择,并且调用对应的函数
        if sel_num == 1:
            print('您选择的是发送信息')
            send_message(udp_socket)
        elif sel_num == 2:
            print('系统正在退出中...')
            print('退出完成!')
            break
        else:
            print('选项输入错误,请重新输入')

    # 6)关闭套接字
    udp_socket.close()

if __name__ == '__main__':
    # 程序独立运行的时候,才去启动聊天器
    main()

TCP服务端框架

  • 支持多线程,思想:每来一个新的客户端,我创建一个新的线程
import socket
import threading

tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_server_socket.bind(("", 8888))
tcp_server_socket.listen(128)  # windows有效,linux此数字无效

def recv_msg(new_client_socket, client_ip_port):
    while True:
        # recv() 会让程序再次阻塞,收到信息后再接阻塞
        recv_data = new_client_socket.recv(1024)
        # 当接受到数据为 空 的时候,表示客户端已经断开连接了,服务端也要断开
        # if len(recv_data)!= 0:
        # b'xxxx'
        # 如果recv_data 非空即为真,否则为假
        if recv_data:
            recv_text = recv_data.decode()
            if recv_text != "exit":
                print("接收到[%s]的信息:%s" % (str(client_ip_port), recv_text))
            else:
                print("客户端已经断开连接!")
                break
        else:
            print("客户端已经断开连接!")
            break

while True:
    new_client_socket, client_ip_port = tcp_server_socket.accept()
    new_client_socket.send("helloworld".encode())
    print(f"新客户端[%s]连接成功" % str(client_ip_port))
    thread_recv_msg = threading.Thread(target=recv_msg, args=(new_client_socket, client_ip_port))
    # recv_msg(new_client_socket, client_ip_port)
    thread_recv_msg.daemon = True
    thread_recv_msg.start()
新客户端[('127.0.0.1', 53222)]连接成功
接收到[('127.0.0.1', 53222)]的信息:dfdfdfdf
接收到[('127.0.0.1', 53222)]的信息:dfdfdfdf
接收到[('127.0.0.1', 53222)]的信息:dfdfdfdf
客户端已经断开连接!