python_day10_并发编程
10.1 操作系统分时多道技术
10.2 进程、线程
10.3 GIL: 全局解释锁
10.4 锁
同步锁
死锁 递归锁
10.5 同步 与 异步
同步事件、信号量
队列
10.6 生产者-消费者模型
10.7 多进程模块
10.8 进程间通信
进程队列Queue
管道
manager
数据同步
进程池
10.1 操作系统
操作系统的作用:
1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口
2:管理、调度进程,并且将多个进程对硬件的竞争变得有序
多道技术:
1.产生背景:针对单核,实现并发
ps:
现在的主机一般是多核,那么每个核都会利用多道技术
有4个cpu,运行于cpu1的某个程序遇到io阻塞,会等到io结束再重新调度,会被调度到4个
cpu中的任意一个,具体由操作系统调度算法决定。
2.空间上的复用:如内存中同时有多道程序
3.时间上的复用:复用一个cpu的时间片
强调:遇到io切,占用cpu时间过长也切,核心在于切之前将进程的状态保存下来,这样
才能保证下次切换回来时,能基于上次切走的位置继续运行
多道程序设计:
在不同用户遇到IO进行切换操作,当用户A IO阻塞时,切换到B用户,当B执行完再处理A
SPOOLING: 外部设备联机操作
分时操作系统: 多个联机操作 + 多道程序设计 类似原先网吧的分屏技术
10.2进程、线程
进程
本质上就是一段程序运行过程(一段抽象概念) 就是一个过程
定义: 进程就是一个程序在数据集上的一次动态执行过程
进程一般是由程序、数据集、进程控制块三部分组成
数据集: 程序运行过程中所需的一切数据资源
进程控制块: 目的是用于切换,状态保存,恢复操作
进程切换: IO切换,时间轮询切换
进程:最小的资源单元
线程:
特点:共享整个进程的内存,数据集
单个的线程不可能脱离进程而存在
一个线程可以创建和撤销另一个线程;同一个进程中的多个线程之间可以并发执行
线程:最小的执行单元,微进程 (运行实例)
一个进程至少得有一个线程
程序运行在内存中,CPU在取内存中的数据拿出并执行,
1.什么是进程?进程和程序之间有什么区别?
进程:一个程序的执行实例称为进程;
每个进程都提供执行程序所需的资源。
进程有一个虚拟地址空间、可执行代码、对系统对象的开放句柄、一个安全上下文、一个惟一的进程标识符、环境变量、一个优先级类、最小和最大工作集大小,以及至少一个执行线程;
每个进程都由一个线程启动,这个线程通常被称为主线程,但是可以从它的任何线程中创建额外的线程;
程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程;
程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。
在多道编程中,我们允许多个程序同时加载到内存中,在操作系统的调度下,可以实现并发地执行,大大提高了CPU的利用率
2.什么是线程?
进程的缺点有:
进程只能在一个时间干一件事,如果想同时干两件事或多件事,进程就无能为力了。
进程在执行的过程中如果阻塞,例如等待输入,整个进程就会挂起,即使进程中有些工作不依赖于输入的数据,也将无法执行。
线程是操作系统能够进行运算调度的最小单位。
它被包含在进程之中,是进程中的实际运作单位。
一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务
线程是一个执行上下文,它是一个CPU用来执行指令流的所有信息。
3.进程和线程之间的关系?
线程共享创建它的进程的地址空间;进程有自己的地址空间。(内存地址)
线程可以直接访问其进程的数据段;进程有自己的父进程数据段的副本。
线程可以直接与进程的其他线程通信;进程必须使用进程间通信来与兄弟进程通信。
新线程很容易创建;新进程需要复制父进程。
线程可以对同一进程的线程进行相当大的控制;进程只能对子进程执行控制。
对主线程的更改(取消、优先级更改等)可能会影响流程的其他线程的行为;对父进程的更改不会影响子进程。
程序计数器: 真正保存的就是一个内存地址, 上下文切换的时候在取出上一次的运行状态,并进程恢复执行操作
############# 进程运行,生成子线程,主子线程开始顺序运行,遇至 IO阻塞开始执行下一条。
import threading
import time
def Tv():
print(time.ctime())
print('look Tv')
time.sleep(2)
print(time.ctime())
# 生成一个对象
T=threading.Thread(target=Tv)
# 跟socket一样调用threading父类的start方法执行
T.start()
print('start ')
# 结果
Thu Jan 18 15:00:50 2018
look Tv
start
Thu Jan 18 15:00:52 2018
############ 线程之间可以并发执行顺序执行 ,执行完主线程然后再执行子线程,直到程序退出
import threading
import time
def Tv():
print('tv show start time ',time.ctime())
print('look Tv')
time.sleep(2)
print('tv show end time ',time.ctime())
def Eat():
print('eat start time ',time.ctime())
print('eat ')
time.sleep(2)
print('eat end time ',time.ctime())
# 生成一个对象
T=threading.Thread(target=Tv)
# 跟socket一样调用threading父类的start方法执行
T.start()
T2=threading.Thread(target=Eat)
T2.start()
print('start ')
## 结果
tv show start timeThu Jan 18 15:06:00 2018
look Tv
eat start timeThu Jan 18 15:06:00 2018
eat
start
tv show end timeThu Jan 18 15:06:02 2018
eat end timeThu Jan 18 15:06:02 2018
########当程序中有join时,会先执行子线程的程序,最后再去执行主线程的代码,并发运行
import threading
import time
def Tv():
print('tv show start time ',time.ctime())
print('look Tv')
time.sleep(2)
print('tv show end time ',time.ctime())
def Eat():
print('eat start time ',time.ctime())
print('eat ')
time.sleep(2)
print('eat end time ',time.ctime())
# 生成一个对象
T=threading.Thread(target=Tv)
T2=threading.Thread(target=Eat)
T.start()
T2.start()
T.join()
T2.join()
print('start ')
## 结果
tv show start timeThu Jan 18 15:14:15 2018
look Tv
eat start timeThu Jan 18 15:14:15 2018
eat
tv show end timeThu Jan 18 15:14:17 2018
eat end timeThu Jan 18 15:14:17 2018
start
######## 如果join在start前,那么就会先执行子线程中的程序 然后再执行下一段子线程,就无法实现并发的效果了
setDaemon(True) 守护线程
子线程是否要守护主线程, 当主线程退出之后, 子线程也会直接跟着退出
其它方法:
Thread实例对象的方法
# isAlive(): 返回线程是否活动的。
# getName(): 返回线程名。
# setName(): 设置线程名。
# 继承 threading.Thread 需要重写一下run方法
class Mythread(threading.Thread):
def __init__(self,num):
# 继承父类的 init方法
threading.Thread.__init__(self)
self.num = num
# 重写run方法 跟sockserver的header方法一样
def run(self):
print('测试一下: %s' %self.num)
time.sleep(2)
if __name__ == '__main__':
t1=Mythread(11111)
t2=Mythread(22222)
t1.start()# 使用start方法,类继承方法必须要有run方法
t2.start()
print(t1.getName()) # Thread-1
t1.setName('test_Thread')
print(t1.getName()) # test_Thread
print('主线程跑起来')
# 结果
测试一下: 11111
测试一下: 22222
主线程跑起来
10.3. python GIL全局解释器锁
无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行
任务分为两部分:IO密集型:计算密集型
sleep等同于IO操作
对于IO密集型的任务: python的多线程的是有意义的
可以采用多进程+协程(也解决IO密集型问题)
对于计算密集型的任务:python的多线程就不推荐,
10.4锁
同步锁:
lock=threading.Lock()创建一个对象
lock.acquire()锁住,只能有一个线程被执行
代码块
lock.release()执行完释放
####### 例一
import threading
import time
num=100
def Nums():
global num
lock.acquire()
temp=num
temp-=1
# 当睡眠时间为1时,结果一定是99 同时执行100个线程,每次取出的结果都为99
time.sleep(0.01)
num=temp
lock.release()
print('k')
s=[]
# 增加同步锁功能就跟异步类似,但它也能同时并发操作,比如在加锁前与加锁后进行打印。
lock=threading.Lock() # 同步锁
if __name__ == '__main__':
for i in range(100):
i=threading.Thread(target=Nums)
i.start() #每个对象都需要start一次
s.append(i) #附加
for x in s:
x.join() # 最后一次的join
print(num) # 主线程
死锁 递归锁:
线程间共享多个资源,两个线程分别占有一部分资源,并且同时等待对方释放,就会造成死锁,
##### 死锁 例一
import threading
import time
class Mythread(threading.Thread):
def Ltest(self):
A.acquire()
print(self.name,'Ltest1_A',time.ctime())
time.sleep(2)
print(self.name,'Ltest1_B',time.ctime())
B.acquire()
print(self.name,'Ltest1_A',time.ctime())
time.sleep(2)
print(self.name,'Ltest1_B',time.ctime())
B.release()
A.release()
def Ltest2(self):
B.acquire()
print(self.name, 'Ltest1_A', time.ctime())
time.sleep(2)
print(self.name, 'Ltest1_B', time.ctime())
A.acquire()
print(self.name, 'Ltest2_A', time.ctime())
time.sleep(2)
print(self.name, 'Ltest2_B', time.ctime())
A.release()
B.release()
def run(self):
self.Ltest()
self.Ltest2()
li=[]
A = threading.Lock()
B = threading.Lock()
if __name__ == '__main__':
for i in range(2):
f=Mythread()
f.start()
li.append(f)
# 执行最后一个数值
for x in li:
x.join()
# Ltest第一次执行完所有的锁并释放,再执行Ltest2,B锁开始运行,然后run方法调用Ltest方法(再次执行A锁,A锁运行完执行B锁的), 死锁就出来了,因为Ltest2的B锁还没有释放
## 结果
Thread-1 L1_test1_A Fri Jan 19 17:28:29 2018
Thread-1 L1_test1_B Fri Jan 19 17:28:31 2018
Thread-1 L2_test1_A Fri Jan 19 17:28:31 2018
Thread-1 L2_test1_B Fri Jan 19 17:28:33 2018
Thread-1 L3_test2_A Fri Jan 19 17:28:33 2018
Thread-2 L1_test1_A Fri Jan 19 17:28:33 2018
Thread-1 L3_test2_B Fri Jan 19 17:28:35 2018
Thread-2 L1_test1_B Fri Jan 19 17:28:35 2018
## 死锁解决办法递归锁
rlock 自身维护着一个计数器,每次加一个锁conut +1执行完减一
import threading
import time
class Mythread(threading.Thread):
def Ltest(self):
R.acquire()
print(self.name,'L1_test1_A',time.ctime())
time.sleep(2)
print(self.name,'L1_test1_B',time.ctime())
R.acquire()
print(self.name,'L2_test1_A',time.ctime())
time.sleep(2)
print(self.name,'L2_test1_B',time.ctime())
R.release()
R.release()
def Ltest2(self):
R.acquire()
print(self.name, 'L3_test2_A', time.ctime())
time.sleep(2)
print(self.name, 'L3_test2_B', time.ctime())
R.acquire()
print(self.name, 'L4_test2_A', time.ctime())
time.sleep(2)
print(self.name, 'L4_test2_B', time.ctime())
R.release()
R.release()
def run(self):
self.Ltest()
self.Ltest2()
li=[]
# A = threading.Lock()
# B = threading.Lock()
R=threading.RLock() # 递归锁
if __name__ == '__main__':
for i in range(2):
f=Mythread()
f.start()
li.append(f)
# 执行最后一个数值
for x in li:
x.join()
# 结果
太长了,资源都是抢占方式运行,所以显示出来的信息是不按顺序打印的
10.5 同步与异步
同步 与 异步
同步: 当进程执行到IO操作(等待外部数据)的时候, 如socket recv与send一发一收时,等。同步
异步不等,一直等到数据接收成功,再回来处理
信号量和同步对象 # 需了解的东西
同步事件
让两个线程之间处于一个同步状态
event.wait()# 等待 需要设置一个flag位,False就表示阻塞
event.set()# 如果设置这个了 就表示为True
event.clear()# 将set设置为false
###### 例一
import threading
import time
class Boss(threading.Thread):
def run(self):
print('Boss: 项目急,大家在加油一下')
print(events.isSet())
# 创建事件对象,设置flag为True
events.set()
time.sleep(3)
print(events.isSet())
print('Boss: 下班,大家辛苦啦')
events.set()
class Worker(threading.Thread):
def run(self):
# 默认是faluse,等待其它事件为true时执行
events.wait()
# 线程并发操作,沉睡时间取决于上一个线程的最大时间
print('Worker: 回去又得好慢了,加油干')
time.sleep(1)
events.clear()
events.wait()
print('Worker: 下班咯')
Li_dist=[]
if __name__ == '__main__':
events=threading.Event() # 创建一个事件对象
for L in range(5):
Li_dist.append(Worker())
Li_dist.append(Boss())
for x in Li_dist:
x.start()
for J in Li_dist: # 最后一个线程join
J.join()
print('结束循环')
# 结果
Boss: 项目急,大家在加油一下
False
Worker: 回去又得好慢了,加油干*5
Boss: 下班,大家辛苦啦
Worker: 下班咯*5
结束循环
信号量:
允许有多少线程同时运行
#### 例一
import threading
from time import sleep
class Mythread(threading.Thread):
def run(self):
# 增加一把锁,同时3次
sem.acquire()
print(self.name)
sleep(1)
# 释放锁
sem.release()
Li=[]
# 定义允许同时能有多少个线程能同时运行,类似电梯,一次只能多重
sem = threading.Semaphore(3)
if __name__ == '__main__':
for i in range(20):
Li.append(Mythread())
for L in Li:
L.start()
# 结果
# 每次打印三个
队列------生产者消费者模型
# 队列是用来解决多线程安全的,有多线程的出现才有队列
队列优先级
1、先进先出
2、先进后出
3、优先级
1、先进先出 ### 例一
import queue # 线程队列
# 首先先创建一个对象 比如列表Lis=[]
q=queue.Queue()
q.put(123)
q.put('asf')
q.put({'name':'xiong'})
while 1:
data=q.get()
print(data)
print('-----')
# 结果
123
-----
asf
-----
{'name': 'xiong'}
-----
# 需要注意的是这里并没有打印完,它一直在这里阻塞,同步状态,一直等待下一个值的接收
# 例二:
q=queue.Queue(3)
#设置最大put值,当定义的put值,超过了定义的值的时候,会直接阻塞
# 例三:
q.put('adsf',False)
# 自已定义一个阻塞, 默认是True
raise Full
queue.Full
# 当定义的值超过例二定义的范围时,会报full错误
2、先进后出
q=queue.LifoQueue(3)
# 好比类似一个细长的水壶,往里放小石子,最前的就是最后倒出来的.
3、优先级
import queue
q=queue.PriorityQueue()
q.put() #定义优先级,值越小优先级越高
q.put()
q.put()
while 1:
data=q.get()
print(data)
print('-----')
-----
-----
-----
# 最后这里打印的也是会一直阻塞,等待下一个值的输入
# 其它方法
import queue
q=queue.PriorityQueue()
q.put()
q.put()
q.put()
print('qsize: ',q.qsize())
print('是否full:',q.full())
print('是否为空:',q.empty())
while 1:
data=q.get()
print(data)
print('-----')
# 结果
qsize:3 # 这里不是对象定义的值,而是根据put做出的判断,有多少个put就有多少个值
是否full: False # 没满
是否为空: False# 不是为空, 可以用于其它程序的判断,比如我只让你输入二个,你输入三个就报错
-----
-----
-----
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
10.6 生产者-消费者模型
### 例一
import queue
import time
import threading
# 生成一个线程对象
q=queue.Queue()
def Producer(name):
count=0
while count < 10:
time.sleep(2)
q.put(count)
# q.task_done() # 发送包子已经制作好
print('Producer %s makeing %s baizi'%(name,count))
q.join()
print('realy make baizi.....')
count+=1
def Consumer(name):
count=0
while count < 10:
time.sleep(3) # 等待
# if not q.empty():# 判断如果是空 那么就应该显示没有包子
# print('wait........')
# q.join() # 等待生产者发送包子作好的消息
# 消费者开始获取包子
data=q.get()
# 消费发送信息到队列中说包子已经开吃了, 生产者又继续开始制作下一个包子
q.task_done()
print('eat.....')
print('Consumer %s eat %s baizi'%(name,data))
# else:
# print('not baizi')
count+=1
P1=threading.Thread(target=Producer,args=('A'))
C1=threading.Thread(target=Consumer,args=('B'))
C2=threading.Thread(target=Consumer,args=('C'))
C3=threading.Thread(target=Consumer,args=('D'))
P1.start()
C1.start()
C3.start()
C2.start()
10.7 多进程模块
多进程模块multiprocessing
开启10进程以内可接受,原因python因为GIL一次只能运行一个线程,严重导致了多核CPU的使用
# 例一
# 导入多进程模块
import multiprocessing
import os
def Mympro(name):
print('name %s'%name)
print('master ppid: ',os.getppid())
print('slave pid: ',os.getpid())
if __name__ == '__main__':
p=multiprocessing.Process(target=Mympro,args=('xiong',))# 严重注意的是: args传递参数需要有一个逗号 ,不然会报格式错误
p.start()
p.join()
# 结果
name xiong
master ppid:7648# 父进程id
slave pid:7656# 子进程id
### 例二 继承
from multiprocessing import Process
import os
class Mympro(Process):
def __init__(self,name):
super(Mympro,self).__init__()
# Process().__init__(self)
self.name=name
def run(self):
print('name: ',self.name)
print('master ppid: ',os.getppid())
print('slave pid: ',os.getpid())
if __name__ == '__main__':
M=Mympro('xiong')
M.start()
M.join()
# 结果
name:xiong
master ppid:3320
slave pid:7516
# 进程
实例方法:
run(): start()调用run方法
terminate(): 不管任务是否完成,立即停止工作过程
属性
daemon: 跟线程的setDaemon一样
进程间通信:
进程互相通信,共享通信 ,队列与管道只能实现数据交互,没有实现数据共享,manager可以用于数据之间共享
1、进程队列Queue
2、管道
3、manager
4、数据同步
5、进程池
# 进程队列
from multiprocessing import Process,Queue
# 继承多进程
class MultiP(Process):
def __init__(self,q):
super(MultiP,self).__init__()
self.q = q
# 重新构造run方法,start直接会调用这个
def run(self):
self.q.put('12321')
print('son id: ',id(self.q))
if __name__ == '__main__':
q=Queue() # 等于multiprocessing.Queue
# 创建的进程队列必须先传递到子进程,也就是先复制一份资源过去,然后才能进行其它的操作
M=MultiP(q)
M.start()
print('main id: ',id(q))
print('mian process: ',q.get())
# 结果
main id:39167816
son id:47022208
mian process:12321
2、管道
from multiprocessing import Process,Pipe
# 驼峰的还是比较漂亮的
def MultiP(conn):
conn.send()
data=conn.recv()
print(data)
print('son id: ',id(conn))
conn.close()
if __name__ == '__main__':
# 这个Pipe类似于socket的conn一样,前面是描述信息,后面是地址
# 这里创建的是两个对象,主一个,子一个用于进程间传递信息
main_conduit, son_conduit = Pipe()
M=Process(target=MultiP,args=(son_conduit,))
M.start()
print(main_conduit.recv())
main_conduit.send('11111')# 可以是字符串,列表,字典等
M.join()
print('main id: ', id(son_conduit))
# 结果
11111
son id:47170728
main id:43719144
3、manager
from multiprocessing import Process,Manager
def MultiP(Li,Di,i):
Di['2']='2'
Di='1'
Li.append(i)
lis=[]
if __name__ == '__main__':
# # manager=Manager()
with Manager() as manager:
Li=manager.list(range(5))
Di=manager.dict()
for i in range(5):
# 必须先将定义的manager的字典传递到进进程才能操作,类似浅拷贝
M=Process(target=MultiP,args=(Li,Di,i))
M.start()
lis.append(M)
for x in lis:
# 进程队列需要先运行子进程然后再运行父进程,否则会报错
x.join()
print(Di)
print(Li)
# 结果
{'2': '2', 0: '1', 1: '1', 2: '1', 4: '1', 3: '1'}
4、数据同步
当加了一把同步锁之后, 不管是进程还是线程 都是串行
from multiprocessing import Process,Lock
from time import sleep
def MultiP(lock,nums):
lock.acquire()
sleep(1)
print('main id: %s' %nums)
lock.release()
if __name__ == '__main__':
lock=Lock()
# 启动10个进程
li=[]
for i in range(10):
M=Process(target=MultiP,args=(lock,i))
M.start()
li.append(M)
for L in li:
L.join()
# 结果:数据同步是同步方式,这是为了保证数据安全,类似tcp操作
5、进程池
apply 同步
apply_async 异步
# 回调函数: 就是某个动作或者函数执行成功后,再去执行的函数
好处: 主进程直接调用回调函数,节省内存资源,如日志
回调函数接收的值来自于 子进程的 return值
from multiprocessing import Process,Pool
from time import sleep
import os
def Foo(nums):
sleep(1)
print('son process:',os.getpid())
print(nums)
return 'Foo: %s'%nums
# 这里的args接受的值来自于Foo函数的return值
def Pro(args):
print(args)
if __name__ == '__main__':
# 定义最大进程池,同时能打开多少个进程,默认是机器的CPU核数
pool=Pool(5)
for i in range(20):
# 异步进程池,callback回调函数,这里回调函数是由主线程运行的
pool.apply_async(func=Foo,args=(i,),callback=Pro)
# 这里的格式是固定的 close , join
pool.close()
pool.join()
print('end')
# 打印结果:每次五个 首先先打印子进程内容,然后再打印回调函数,依次执行完进程,最后打印end
6、协程 协作式, 非抢占式
用户态: 通过用户自己进行操作的模式
协程主要解决的也是IO操作的时候
协程:本质上也是一个线程
协程的优势:
1、没有切换的消耗
2、没有锁的概念 (本身就是一个线程,遇到IO就切换)
缺点: 不能使用多核,(单线程)但可以采用多进程+协程, 一个很好的解决并发的方案
python自带的yield
#### yield基础用法 例一:
def Yi():
print('ok')
while True:
x=yield
print(x)
y=Yi()
next(y)
y.send(1)# 发送一个值到yield x接受并打印,没有while就会直接打印出 StopIteration
#### 例二
def Yi():
print('ok')
while True:
x=yield
print('resv from Y2 value: %s'%x)
def Y2():
next(y)
n=0
while n<=4:
print('relay send Yi value %s'%(n))
y.send(n)
n+=2
if __name__ == '__main__':
y=Yi()
p=Y2()
# 结果
ok# 使用next打印生成器中的值
relay send Yi value 0# send发送值到Y1,yield接收并赋值给x,然后打印
resv from Y2 value: 0# 使用的是同一个线程,所以没有切换的损耗
relay send Yi value 2
resv from Y2 value: 2
relay send Yi value 4
resv from Y2 value: 4
greenlet
switch()启动 切换
缺点: 每一个需求都需要手动switch切换
py3.6没有这个模块了
gevent 重要
页:
[1]