多任务、多线程
多任务
- 多任务: 同一时间有多个任务在执行,这就是多任务
- python程序默认是单任务
import time
# 唱歌的函数
def sing():
for i in range(3):
print("正在唱歌。。。")
time.sleep(0.5)
# 跳舞的函数
def dangce():
for i in range(3):
print("正在跳舞。。。")
time.sleep(0.5)
# 调用
if __name__ == '__main__':
sing()
dangce()
正在唱歌。。。
正在唱歌。。。
正在唱歌。。。
正在跳舞。。。
正在跳舞。。。
正在跳舞。。。
线程
- 线程是CPU调度的基本单元
- 主线程:程序启动后有一个默认的主线,通常称为主线程
- 作用:1)创建子线程 2)等其他子线程执行结束后,做关闭操作
- 子线程:程序的一个分支
- 子线程创建
- 导入模块 threading
- 创建线程对象 threading.Thread(target=执行的分支函数名) # 注意,函数名没有()
- 启动子线程 线程对象.start*
import time, threading
# 唱歌的函数
def sing():
for i in range(3):
print("正在唱歌。。。")
time.sleep(0.5)
# 跳舞的函数
def dangce():
for i in range(3):
print("正在跳舞。。。")
time.sleep(0.5)
# 调用
if __name__ == '__main__':
sing_thread = threading.Thread(target=sing)
dangce_thread = threading.Thread(target=dangce)
sing_thread.start()
dangce_thread.start()
正在唱歌。。。
正在跳舞。。。
正在跳舞。。。正在唱歌。。。
正在唱歌。。。正在跳舞。。。
线程数量
- 获取线程的数量:正在活跃的线程数量
- threading.enumerate() --- 当前活跃的线程对象列表
- 长度: len(threading.enumerate())
- 获取线程的名称
- threading.current_thread() 获取当前的线程对象,对象中含有名称
import time, threading
# 唱歌的函数
def sing():
for i in range(3):
print(f"正在唱歌。。。{threading.current_thread()}")
time.sleep(0.5)
# 跳舞的函数
def dangce():
for i in range(3):
print(f"正在跳舞。。。{threading.current_thread()}")
time.sleep(0.5)
# 调用
if __name__ == '__main__':
print(len(threading.enumerate()))
sing_thread = threading.Thread(target=sing)
dangce_thread = threading.Thread(target=dangce)
sing_thread.start()
dangce_thread.start()
print(threading.current_thread())
print(len(threading.enumerate()))
print(threading.enumerate())
6
正在唱歌。。。
正在跳舞。。。
<_MainThread(MainThread, started 387132)>
8
[<_MainThread(MainThread, started 387132)>, , , , , , , ]
正在唱歌。。。正在跳舞。。。
正在跳舞。。。正在唱歌。。。
线程的参数及顺序
- 线程参数
- 元组 args
- threading.Tread(target=xxx, args=(参数1, 参数2, ....))
- 元组中元素的顺序和函数的参数顺序一致
- 字典 kwargs
- 使用字典传递 threading.Thread(target=xxx, kwargs={"参数名": 参数值, ...})
- thread_sing = threading.Thread(kwargs={"a": 10, "c": 1000, "b": 100})
- 混合元组和字典
- 混合使用元组和字典 threading.Thread(target=xxx, args=(参数1,参数2,...), kwargs={"参数名": 参数值,...})
- thread_sing = threading.Thread(target=sing, args=(10, ), kwargs={"c": 1000, "b": 100})
- 线程顺序: 线程的执行顺序是无序的
- 线程有CPU调度执行,CPU会根据系统运行状态按照自己的调度算法去调度执行
import time, threading
# 唱歌的函数
def sing(a, b, c):
for i in range(3):
print(f"正在唱歌。{a}。{b}。{c},当前线程:{threading.current_thread()}")
time.sleep(0.5)
# 跳舞的函数
def dangce(a, b, c):
for i in range(3):
print(f"正在跳舞。{a}。{b}。{c},当前线程:{threading.current_thread()}")
time.sleep(0.5)
# 调用
if __name__ == '__main__':
sing_thread = threading.Thread(target=sing, args=(1, 2, 3))
dangce_thread = threading.Thread(target=dangce, kwargs={"a": 4, "b": 5, "c": 6})
sing_thread.start()
dangce_thread.start()
正在唱歌。1。2。3,当前线程:正在跳舞。4。5。6,当前线程:
正在跳舞。4。5。6,当前线程:正在唱歌。1。2。3,当前线程:
正在唱歌。1。2。3,当前线程:正在跳舞。4。5。6,当前线程:
线程-守护线程
守护线程:子线程和主线程的一种约定(当主线程结束后,子线程也结束)
- 子线程.setDaemon(True)
- 设置需要在start()之前
import time, threading
def work():
for i in range(10):
print(f"正在执行work{i}")
time.sleep(0.5)
if __name__ == '__main__':
work_thread = threading.Thread(target=work)
work_thread.start()
# 让主线退出
time.sleep(2)
exit()
# 主线程退出后子线程仍然执行完函数
import time, threading
def work():
for i in range(10):
print(f"正在执行work{i}")
time.sleep(0.5)
if __name__ == '__main__':
work_thread = threading.Thread(target=work)
# work_thread.setDaemon(True) python3.12弃用不推荐
work_thread.daemon=True
work_thread.start()
# 让主线退出
time.sleep(2)
exit()
# 主线程退出后子线程立刻停止执行
并行和并发
- 并发: 任务数大于CPU的核心数,多个任务并发执行
- 并行: 任务数小于CPU核心数,多核处理器同一时刻,多个任务同时执行
自定义线程类
- 通过继承threading.Thread可以实现自自定义线程
-
自定义线程类的步骤
- 导入模块
- 创建类并且继承 threading.Thread
- class MyThread(threading.Thread):
-
重写父类的run方法
- def run(self):
-
创建对象并且调用.start()
- mythread = MyThread()
- mythread.start()
-
底层原理
- Thread类:
- run方法
- start()
- start()中调用了run方法
-
自定义线程类的init方法问题
- 子类先通过super调用父类的初始化方法,子类再初始化
def __init__(self,num):
# 先去调用父类的init方法
super().__init__()
self.num = num
import threading, time
# 自定义线程类
class MyThread(threading.Thread):
def __init__(self, num):
# 先去调用父类的init方法
super().__init__()
self.num = num
# 重写父类的run方法
def run(self):
for i in range(3):
# self.name 从父类继承的一个属性
print("正在执行子线程的run方法。。。", i, self.name, self.num)
time.sleep(0.5)
if __name__ == '__main__':
print("主线程执行中。。。")
mythread = MyThread(10)
mythread.start()
主线程执行中。。。
正在执行子线程的run方法。。。 0 Thread-14 10
正在执行子线程的run方法。。。 1 Thread-14 10
正在执行子线程的run方法。。。 2 Thread-14 10
多线程-共享全局变量
- 多线程之间可以共享全局变量
- 存在的问题:多个线程同时访问同一个资源,出现资源竞争的问题
- 解决方法:优先让某个线程先执行
线程对象.join()
缺点:把多线程变成了单线程,影响整体性能
import threading
import time
# 定义全局变量
num = 0
def work1():
# 声明全局变量
global num
for i in range(1000000):
num += 1
print(f"work1----{num}")
def work2():
global num
for i in range(1000000):
num += 1
print(f"work2----{num}")
if __name__ == '__main__':
t1 = threading.Thread(target=work1)
t2 = threading.Thread(target=work2)
t1.start()
# 优先让t1 先执行完成
t1.join()
t2.start()
# print(len(threading.enumerate())) # 6
# 确保t1 t2线程执行过多毕后打印num值
while len(threading.enumerate()) != 6:
time.sleep(1)
print(f"main----{num}")
# 存在的问题:多个线程同时访问同一个资源,出现资源竞争的问题
# work1----1363814
# work2----2000000
# main----2000000
# t1.join() 让t1优先执行
# work1----1000000
# work2----2000000
# main----2000000
work1----1000000
work2----2000000
main----2000000
同步、异步
- 同步:在多任务中 多个任务执行有先后顺序,一个执行完毕后,另外一个再执行
- 异步:在多任务中多个任务执行没有先后顺序,多个任务同时执行
- 线程的锁机制:当线程获取资源后,立刻进行锁定,资源使用完毕后再解锁,有效的保证同一时间只有一个线程在使用资源
互斥锁
- 互斥锁使用分三步:
- 创建一把锁:
lock1 = threading.Lock() - 上锁:
lock1.acquire() - 解锁:
lock1.release()
- 创建一把锁:
- 互斥锁的使用原则:尽可能少的锁定竞争资源
死锁
- 死锁:在多线程中,两个线程都占用一些资源 ,而且同一时间都在等待对方释放资源,这种状态就是死锁状态
- 避免:锁使用完毕后,要及时释放
# 互斥锁
import threading
num = 0
def work1():
global num
lock1.acquire()
for i in range(1000000):
num += 1
lock1.release()
print(f"work1----{num}")
def work2():
global num
lock1.acquire()
for i in range(1000000):
num += 1
lock1.release()
print(f"work2----{num}")
if __name__ == '__main__':
# 创建一把互拆锁
lock1 = threading.Lock()
t1 = threading.Thread(target=work1)
t2 = threading.Thread(target=work2)
t1.start()
t2.start()
# print(len(threading.enumerate())) # 8
# 确保t1 t2线程执行过多毕后打印num值
while len(threading.enumerate()) != 8:
time.sleep(1)
print(f"main----{num}")
work1----1000000
work2----2000000
main----2000000
# 死锁
import threading
# 定义函数,根据下标获取列表元素值
def get_value(index):
data_list = [1, 2, 3, 4, 5, 6]
lock.acquire()
# 判断下标位置是否正确
if index >= len(data_list):
print("下标越界!", index)
lock.release() # 程序提前退出,锁未释放,会导致后线线程等待解锁,形成死锁
return
print(data_list[index])
lock.release()
# 创建10个线程,观察资源的等待状态
if __name__ == '__main__':
# 创建锁
lock = threading.Lock()
for i in range(10):
t = threading.Thread(target=get_value, args=(i, ))
t.start()
1
2
3
4
5
6
下标越界! 6
下标越界! 7
下标越界! 8
下标越界! 9
多任务版UDP聊天器
- 创建一个子线程,单独用来接收信息
- 删除 接收信息的菜单和判断
- 可以接收多条信息
- 让子线程守护主线程
import socket
import threading
def send_message(udp_socket):
"""返送信息的函数"""
# 1) 定义变量接收用户与输入的接收方的IP地址
ipaddr = input("请输入接收方的IP地址:\n")
# 判断是否需要默认
if len(ipaddr) == 0:
ipaddr = "168.12.1.50"
print("当前接收方默认IP设置为[%s]" % ipaddr)
# 2)定义变量接收用户与输入的接收方的端口号
port = input("请输入接收方的端口号:\n")
if len(port) == 0:
port = "8888"
print("当前接收方默认端口设置为[%s]" % port)
# 3)定义变量接收用户与输入的接收方的内容
content = input("请输入要发送的内容:\n")
# 4)使用socket的sendto()发送信息
udp_socket.sendto(content.encode(), (ipaddr, int(port)))
def recv_msg(udp_socket):
"""接收信息的函数"""
while True:
# 1) 使用socket接收数据
recv_data, ip_port = udp_socket.recvfrom(1024)
# 2)解码数据
recv_txt = recv_data.decode()
# 3)输出显示
print('收到', ip_port, '发送的消息:', recv_txt)
def main():
"""程序主入口"""
# 1)创建套接字
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 2)绑定端口
udp_socket.bind(("", 6666))
# 创建子线程,单独接收用户发送的信息
thread_recvmsg = threading.Thread(target=recv_msg, args=(udp_socket, ))
# 设置子线程守护主线程
thread_recvmsg.daemon=True
thread_recvmsg.start()
# 3)打印菜单(循环)
while True:
print('\n\n*****************************')
print('******* 1、发送信息 *******')
print('******* 2、退出系统 *******')
print('*****************************')
# 4)接收用户输入的选项
sel_num = int(input('请输入选项:\n'))
# 5)判断用户的选择,并且调用对应的函数
if sel_num == 1:
print('您选择的是发送信息')
send_message(udp_socket)
elif sel_num == 2:
print('系统正在退出中...')
print('退出完成!')
break
else:
print('选项输入错误,请重新输入')
# 6)关闭套接字
udp_socket.close()
if __name__ == '__main__':
# 程序独立运行的时候,才去启动聊天器
main()
TCP服务端框架
- 支持多线程,思想:每来一个新的客户端,我创建一个新的线程
import socket
import threading
tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_server_socket.bind(("", 8888))
tcp_server_socket.listen(128) # windows有效,linux此数字无效
def recv_msg(new_client_socket, client_ip_port):
while True:
# recv() 会让程序再次阻塞,收到信息后再接阻塞
recv_data = new_client_socket.recv(1024)
# 当接受到数据为 空 的时候,表示客户端已经断开连接了,服务端也要断开
# if len(recv_data)!= 0:
# b'xxxx'
# 如果recv_data 非空即为真,否则为假
if recv_data:
recv_text = recv_data.decode()
if recv_text != "exit":
print("接收到[%s]的信息:%s" % (str(client_ip_port), recv_text))
else:
print("客户端已经断开连接!")
break
else:
print("客户端已经断开连接!")
break
while True:
new_client_socket, client_ip_port = tcp_server_socket.accept()
new_client_socket.send("helloworld".encode())
print(f"新客户端[%s]连接成功" % str(client_ip_port))
thread_recv_msg = threading.Thread(target=recv_msg, args=(new_client_socket, client_ip_port))
# recv_msg(new_client_socket, client_ip_port)
thread_recv_msg.daemon = True
thread_recv_msg.start()
新客户端[('127.0.0.1', 53222)]连接成功
接收到[('127.0.0.1', 53222)]的信息:dfdfdfdf
接收到[('127.0.0.1', 53222)]的信息:dfdfdfdf
接收到[('127.0.0.1', 53222)]的信息:dfdfdfdf
客户端已经断开连接!