目录
Python并发编程05/ 死锁/递归锁/信号量/GIL锁/进程池/线程池
1.昨日回顾
#生产者消费者模型.# 生产者: 产生数据,# 消费者: 接收数据并做下一步处理# 容器: 队列.#进程, 线程:# 进程就是资源单位,线程就是执行单位.#进程与线程的区别:# 线程: 开销小,速度快,同一个进程下的线程资源内存级别共享.# 进程: 开销巨大,速度慢, 不同进程的数据内存级别不共享.#join: 阻塞,# t1.join() 阻塞.# print('主')#getname, setname .name#activeCount() 线程的数量守护线程: 如果守护线程的生命周期小于其他线程,则他肯定先结束,否则等待其他非守护线程和主线程结束之后结束.#互斥锁,锁
2.死锁现象与递归锁
#递归锁可以解决死锁现象,业务需要多个锁时,先要考虑递归锁
2.1死锁现象
# from threading import Thread# from threading import Lock# import time## lock_A = Lock()# lock_B = Lock()### class MyThread(Thread):## def run(self):# self.f1()# self.f2()### def f1(self):## lock_A.acquire()# print(f'{self.name}拿到了A锁')## lock_B.acquire()# print(f'{self.name}拿到了B锁')## lock_B.release()## lock_A.release()## def f2(self):## lock_B.acquire()# print(f'{self.name}拿到了B锁')## time.sleep(0.1)# lock_A.acquire()# print(f'{self.name}拿到了A锁')## lock_A.release()# lock_B.release()## if __name__ == '__main__':## for i in range(3):# t = MyThread()# t.start()
2.2递归锁
递归锁有一个计数的功能, 原数字为0,上一次锁,计数+1,释放一次锁,计数-1,只要递归锁上面的数字不为零,其他线程就不能抢锁.# from threading import Thread# from threading import RLock# import time## lock_A = lock_B = RLock()# 递归锁有一个计数的功能, 原数字为0,上一次锁,计数+1,释放一次锁,计数-1,# 只要递归锁上面的数字不为零,其他线程就不能抢锁.# class MyThread(Thread):## def run(self):# self.f1()# self.f2()### def f1(self):## lock_A.acquire()# print(f'{self.name}拿到了A锁')## lock_B.acquire()# print(f'{self.name}拿到了B锁')## lock_B.release()## lock_A.release()## def f2(self):## lock_B.acquire()# print(f'{self.name}拿到了B锁')## time.sleep(0.1)# lock_A.acquire()# print(f'{self.name}拿到了A锁')## lock_A.release()## lock_B.release()## if __name__ == '__main__':## for i in range(3):# t = MyThread()# t.start()
3.信号量
也是一种锁,控制并发数量# from threading import Thread, Semaphore, current_thread# import time# import random# sem = Semaphore(5)## def task():# sem.acquire()## print(f'{current_thread().name} 厕所ing')# time.sleep(random.randint(1,3))## sem.release()### if __name__ == '__main__':# for i in range(20):# t = Thread(target=task,)# t.start()
4.GIL全局解释器锁
4.1背景
#理论上来说:单个进程的多线程可以利用多核.
#但是,开发Cpython解释器的程序员,给进入解释器的线程加了锁.
4.2为什么加锁
1. 当时都是单核时代,而且cpu价格非常贵.2. 如果不加全局解释器锁, 开发Cpython解释器的程序员就会在源码内部各种主动加锁,解锁,非常麻烦,各种死锁现象等等.他为了省事儿,直接进入解释器时给线程加一个锁. 优点: 保证了Cpython解释器的数据资源的安全. 缺点: 单个进程的多线程不能利用多核.#Jpython没有GIL锁.#pypy也没有GIL锁.#现在多核时代, 我将Cpython的GIL锁去掉行么?#因为Cpython解释器所有的业务逻辑都是围绕着单个线程实现的,去掉这个GIL锁,几乎不可能.单个进程的多线程可以并发,但是不能利用多核,不能并行.多个进程可以并发,并行.
5.GIL与Lock锁的区别
相同点: 都是同种锁,互斥锁.不同点: GIL锁全局解释器锁,保护解释器内部的资源数据的安全. GIL锁 上锁,释放无需手动操作. 自己代码中定义的互斥锁保护进程中的资源数据的安全. 自己定义的互斥锁必须自己手动上锁,释放锁.
6.验证计算密集型IO密集型的效率
6.1 IO密集型
# IO密集型: 单个进程的多线程并发 vs 多个进程的并发并行# def task():# count = 0# time.sleep(random.randint(1,3))# count += 1# if __name__ == '__main__':# 多进程的并发,并行# start_time = time.time()# l1 = []# for i in range(50):# p = Process(target=task,)# l1.append(p)# p.start()## for p in l1:# p.join()## print(f'执行效率:{time.time()- start_time}') # 8.000000000# 多线程的并发# start_time = time.time()# l1 = []# for i in range(50):# p = Thread(target=task,)# l1.append(p)# p.start()## for p in l1:# p.join()## print(f'执行效率:{time.time()- start_time}') # 3.0294392108917236对于IO密集型: 单个进程的多线程的并发效率高.
6.2 计算密集型
#from threading import Thread#from multiprocessing import Process#import time#import random# # 计算密集型: 单个进程的多线程并发 vs 多个进程的并发并行## def task():# count = 0# for i in range(10000000):# count += 1### if __name__ == '__main__':## # 多进程的并发,并行# # start_time = time.time()# # l1 = []# # for i in range(4):# # p = Process(target=task,)# # l1.append(p)# # p.start()# ## # for p in l1:# # p.join()# ## # print(f'执行效率:{time.time()- start_time}') # 3.1402080059051514## # 多线程的并发# # start_time = time.time()# # l1 = []# # for i in range(4):# # p = Thread(target=task,)# # l1.append(p)# # p.start()# ## # for p in l1:# # p.join()# ## # print(f'执行效率:{time.time()- start_time}') # 4.5913777351379395总结: 计算密集型: 多进程的并发并行效率高.
7.多线程实现socket通信
#无论是多线程还是多进程,如果按照上面的写法,来一个客户端请求,我就开一个线程,来一个请求开一个线程,#应该是这样: 你的计算机允许范围内,开启的线程进程数量越多越好.
7.1服务端
# import socket# from threading import Thread## def communicate(conn,addr):# while 1:# try:# from_client_data = conn.recv(1024)# print(f'来自客户端{addr[1]}的消息: {from_client_data.decode("utf-8")}')# to_client_data = input('>>>').strip()# conn.send(to_client_data.encode('utf-8'))# except Exception:# break# conn.close()## def _accept():# server = socket.socket()# server.bind(('127.0.0.1', 8848))# server.listen(5)## while 1:# conn, addr = server.accept()# t = Thread(target=communicate,args=(conn,addr))# t.start()## if __name__ == '__main__':# _accept()
7.2客户端
# import socket# client = socket.socket()# client.connect(('127.0.0.1',8848))## while 1:# try:# to_server_data = input('>>>').strip()# client.send(to_server_data.encode('utf-8'))## from_server_data = client.recv(1024)# print(f'来自服务端的消息: {from_server_data.decode("utf-8")}')## except Exception:# break# client.close()
8.进程池,线程池
#线程池: 一个容器,这个容器限制住你开启线程的数量,比如4个,第一次肯定只能并发的处理4个任务,只要有任务完成,线程马上就会接下一个任务.以时间换空间.# from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor# import os# import time# import random# # # print(os.cpu_count())# def task(n):# print(f'{os.getpid()} 接客')# time.sleep(random.randint(1,3))# # # if __name__ == '__main__': # 开启进程池 (并行(并行+并发)) # p = ProcessPoolExecutor() # 默认不写,进程池里面的进程数与cpu个数相等 # # # p.submit(task,1) # # p.submit(task,1) # # p.submit(task,1) # # p.submit(task,1) # # p.submit(task,1) # # p.submit(task,1) # # p.submit(task,1) # for i in range(20): # p.submit(task,i) # # 开启线程池 (并发) # t = ThreadPoolExecutor() # 默认不写, cpu个数*5 线程数 # t = ThreadPoolExecutor(100) # 100个线程 # for i in range(20): # t.submit(task,i)