小fish 发表于 2018-8-8 07:12:31

python_day10_并发编程

  10.1 操作系统
  分时多道技术
  10.2 进程、线程
  10.3 GIL: 全局解释锁
  10.4 锁
  同步锁
  死锁 递归锁
  10.5 同步 与 异步
  同步事件、信号量
  队列
  10.6 生产者-消费者模型
  10.7 多进程模块
  10.8 进程间通信
  进程队列Queue
  管道
  manager
  数据同步
  进程池
  10.1 操作系统
  操作系统的作用:
  1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口
  2:管理、调度进程,并且将多个进程对硬件的竞争变得有序
  多道技术:
  1.产生背景:针对单核,实现并发
  ps:
  现在的主机一般是多核,那么每个核都会利用多道技术
  有4个cpu,运行于cpu1的某个程序遇到io阻塞,会等到io结束再重新调度,会被调度到4个
  cpu中的任意一个,具体由操作系统调度算法决定。
  2.空间上的复用:如内存中同时有多道程序
  3.时间上的复用:复用一个cpu的时间片
  强调:遇到io切,占用cpu时间过长也切,核心在于切之前将进程的状态保存下来,这样
  才能保证下次切换回来时,能基于上次切走的位置继续运行
  多道程序设计:
  在不同用户遇到IO进行切换操作,当用户A IO阻塞时,切换到B用户,当B执行完再处理A
  SPOOLING: 外部设备联机操作
  分时操作系统: 多个联机操作 + 多道程序设计    类似原先网吧的分屏技术
  10.2进程、线程
  进程
  本质上就是一段程序运行过程(一段抽象概念) 就是一个过程
  定义: 进程就是一个程序在数据集上的一次动态执行过程
  进程一般是由程序、数据集、进程控制块三部分组成
  数据集: 程序运行过程中所需的一切数据资源
  进程控制块: 目的是用于切换,状态保存,恢复操作
  进程切换: IO切换,时间轮询切换
  进程:最小的资源单元
  线程:
  特点:共享整个进程的内存,数据集
  单个的线程不可能脱离进程而存在
  一个线程可以创建和撤销另一个线程;同一个进程中的多个线程之间可以并发执行
  线程:最小的执行单元,微进程 (运行实例)
  一个进程至少得有一个线程
  程序运行在内存中,CPU在取内存中的数据拿出并执行,
  1.什么是进程?进程和程序之间有什么区别?
  进程:一个程序的执行实例称为进程;
  每个进程都提供执行程序所需的资源。
  进程有一个虚拟地址空间、可执行代码、对系统对象的开放句柄、一个安全上下文、一个惟一的进程标识符、环境变量、一个优先级类、最小和最大工作集大小,以及至少一个执行线程;
  每个进程都由一个线程启动,这个线程通常被称为主线程,但是可以从它的任何线程中创建额外的线程;
  程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程;
  程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。
  在多道编程中,我们允许多个程序同时加载到内存中,在操作系统的调度下,可以实现并发地执行,大大提高了CPU的利用率
  2.什么是线程?
  进程的缺点有:
  进程只能在一个时间干一件事,如果想同时干两件事或多件事,进程就无能为力了。
  进程在执行的过程中如果阻塞,例如等待输入,整个进程就会挂起,即使进程中有些工作不依赖于输入的数据,也将无法执行。
  线程是操作系统能够进行运算调度的最小单位。
  它被包含在进程之中,是进程中的实际运作单位。
  一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务
  线程是一个执行上下文,它是一个CPU用来执行指令流的所有信息。
  3.进程和线程之间的关系?
  线程共享创建它的进程的地址空间;进程有自己的地址空间。(内存地址)
  线程可以直接访问其进程的数据段;进程有自己的父进程数据段的副本。
  线程可以直接与进程的其他线程通信;进程必须使用进程间通信来与兄弟进程通信。
  新线程很容易创建;新进程需要复制父进程。
  线程可以对同一进程的线程进行相当大的控制;进程只能对子进程执行控制。
  对主线程的更改(取消、优先级更改等)可能会影响流程的其他线程的行为;对父进程的更改不会影响子进程。
  程序计数器: 真正保存的就是一个内存地址, 上下文切换的时候在取出上一次的运行状态,并进程恢复执行操作
  ############# 进程运行,生成子线程,主子线程开始顺序运行,遇至 IO阻塞开始执行下一条。
import threading  
import time
  

  
def Tv():
  
    print(time.ctime())
  
    print('look Tv')
  
    time.sleep(2)
  
    print(time.ctime())
  

  
# 生成一个对象
  
T=threading.Thread(target=Tv)
  
# 跟socket一样调用threading父类的start方法执行
  
T.start()
  
print('start ')
  

  
# 结果
  Thu Jan 18 15:00:50 2018
  look Tv
  start
  Thu Jan 18 15:00:52 2018
  ############   线程之间可以并发执行顺序执行 ,执行完主线程然后再执行子线程,直到程序退出
import threading  
import time
  

  
def Tv():
  
    print('tv show start time ',time.ctime())
  
    print('look Tv')
  
    time.sleep(2)
  
    print('tv show end time ',time.ctime())
  

  
def Eat():
  
    print('eat start time ',time.ctime())
  
    print('eat ')
  
    time.sleep(2)
  
    print('eat end time ',time.ctime())
  

  
# 生成一个对象
  
T=threading.Thread(target=Tv)
  
# 跟socket一样调用threading父类的start方法执行
  
T.start()
  
T2=threading.Thread(target=Eat)
  
T2.start()
  

  
print('start ')
  

  
## 结果
  
tv show start timeThu Jan 18 15:06:00 2018
  
look Tv
  
eat start timeThu Jan 18 15:06:00 2018
  
eat
  
start
  
tv show end timeThu Jan 18 15:06:02 2018
  
eat end timeThu Jan 18 15:06:02 2018
  ########当程序中有join时,会先执行子线程的程序,最后再去执行主线程的代码,并发运行
import threading  
import time
  

  
def Tv():
  
    print('tv show start time ',time.ctime())
  
    print('look Tv')
  
    time.sleep(2)
  
    print('tv show end time ',time.ctime())
  

  
def Eat():
  
    print('eat start time ',time.ctime())
  
    print('eat ')
  
    time.sleep(2)
  
    print('eat end time ',time.ctime())
  

  
# 生成一个对象
  
T=threading.Thread(target=Tv)
  

  
T2=threading.Thread(target=Eat)
  

  
T.start()
  
T2.start()
  

  
T.join()
  
T2.join()
  
print('start ')
  

  
## 结果
  
tv show start timeThu Jan 18 15:14:15 2018
  
look Tv
  
eat start timeThu Jan 18 15:14:15 2018
  
eat
  
tv show end timeThu Jan 18 15:14:17 2018
  
eat end timeThu Jan 18 15:14:17 2018
  
start
  ######## 如果join在start前,那么就会先执行子线程中的程序 然后再执行下一段子线程,就无法实现并发的效果了
  setDaemon(True) 守护线程
  子线程是否要守护主线程, 当主线程退出之后, 子线程也会直接跟着退出
  其它方法:
  Thread实例对象的方法
  # isAlive(): 返回线程是否活动的。
  # getName(): 返回线程名。
  # setName(): 设置线程名。
  # 继承 threading.Thread 需要重写一下run方法
class Mythread(threading.Thread):  
    def __init__(self,num):
  
      # 继承父类的 init方法
  
      threading.Thread.__init__(self)
  
      self.num = num
  
    # 重写run方法 跟sockserver的header方法一样
  
    def run(self):
  
      print('测试一下: %s' %self.num)
  
      time.sleep(2)
  

  
if __name__ == '__main__':
  
    t1=Mythread(11111)
  
    t2=Mythread(22222)
  
    t1.start()# 使用start方法,类继承方法必须要有run方法
  
    t2.start()
  
    print(t1.getName())   # Thread-1
  
    t1.setName('test_Thread')
  
    print(t1.getName())   # test_Thread
  

  
    print('主线程跑起来')
  

  
# 结果
  测试一下: 11111
  测试一下: 22222
  主线程跑起来
  10.3. python GIL全局解释器锁
  无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行
  任务分为两部分:IO密集型:计算密集型
  sleep等同于IO操作
  对于IO密集型的任务:    python的多线程的是有意义的
  可以采用多进程+协程(也解决IO密集型问题)
  对于计算密集型的任务:python的多线程就不推荐,
  10.4锁
  同步锁:
lock=threading.Lock()创建一个对象  
lock.acquire()锁住,只能有一个线程被执行
  
代码块
  
lock.release()执行完释放
  

  

  
####### 例一
  
import threading
  
import time
  
num=100
  
def Nums():
  
    global num
  
    lock.acquire()
  
    temp=num
  
    temp-=1
  
    # 当睡眠时间为1时,结果一定是99 同时执行100个线程,每次取出的结果都为99
  
    time.sleep(0.01)
  
    num=temp
  
    lock.release()
  
    print('k')
  

  
s=[]
  
# 增加同步锁功能就跟异步类似,但它也能同时并发操作,比如在加锁前与加锁后进行打印。
  
lock=threading.Lock()       # 同步锁
  

  
if __name__ == '__main__':
  

  
    for i in range(100):
  
      i=threading.Thread(target=Nums)
  
      i.start()   #每个对象都需要start一次
  
      s.append(i) #附加
  
    for x in s:
  
      x.join()    # 最后一次的join
  
    print(num)      # 主线程
  死锁 递归锁:
  线程间共享多个资源,两个线程分别占有一部分资源,并且同时等待对方释放,就会造成死锁,
##### 死锁 例一  
import threading
  
import time
  
class Mythread(threading.Thread):
  
    def Ltest(self):
  
      A.acquire()
  
      print(self.name,'Ltest1_A',time.ctime())
  
      time.sleep(2)
  
      print(self.name,'Ltest1_B',time.ctime())
  

  
      B.acquire()
  
      print(self.name,'Ltest1_A',time.ctime())
  
      time.sleep(2)
  
      print(self.name,'Ltest1_B',time.ctime())
  
      B.release()
  

  
      A.release()
  

  
    def Ltest2(self):
  
      B.acquire()
  
      print(self.name, 'Ltest1_A', time.ctime())
  
      time.sleep(2)
  
      print(self.name, 'Ltest1_B', time.ctime())
  

  
      A.acquire()
  
      print(self.name, 'Ltest2_A', time.ctime())
  
      time.sleep(2)
  
      print(self.name, 'Ltest2_B', time.ctime())
  
      A.release()
  

  
      B.release()
  

  

  
    def run(self):
  
      self.Ltest()
  
      self.Ltest2()
  

  
li=[]
  
A = threading.Lock()
  
B = threading.Lock()
  

  
if __name__ == '__main__':
  
    for i in range(2):
  
      f=Mythread()
  
      f.start()
  
      li.append(f)
  

  
    # 执行最后一个数值
  
    for x in li:
  
      x.join()
  

  
    # Ltest第一次执行完所有的锁并释放,再执行Ltest2,B锁开始运行,然后run方法调用Ltest方法(再次执行A锁,A锁运行完执行B锁的), 死锁就出来了,因为Ltest2的B锁还没有释放
  

  
    ## 结果
  
      Thread-1 L1_test1_A Fri Jan 19 17:28:29 2018
  
      Thread-1 L1_test1_B Fri Jan 19 17:28:31 2018
  
      Thread-1 L2_test1_A Fri Jan 19 17:28:31 2018
  
      Thread-1 L2_test1_B Fri Jan 19 17:28:33 2018
  
      Thread-1 L3_test2_A Fri Jan 19 17:28:33 2018
  
      Thread-2 L1_test1_A Fri Jan 19 17:28:33 2018
  
      Thread-1 L3_test2_B Fri Jan 19 17:28:35 2018
  
      Thread-2 L1_test1_B Fri Jan 19 17:28:35 2018
  ## 死锁解决办法递归锁
  rlock 自身维护着一个计数器,每次加一个锁conut +1执行完减一
import threading  
import time
  
class Mythread(threading.Thread):
  
    def Ltest(self):
  
      R.acquire()
  
      print(self.name,'L1_test1_A',time.ctime())
  
      time.sleep(2)
  
      print(self.name,'L1_test1_B',time.ctime())
  

  
      R.acquire()
  
      print(self.name,'L2_test1_A',time.ctime())
  
      time.sleep(2)
  
      print(self.name,'L2_test1_B',time.ctime())
  
      R.release()
  

  
      R.release()
  

  
    def Ltest2(self):
  
      R.acquire()
  
      print(self.name, 'L3_test2_A', time.ctime())
  
      time.sleep(2)
  
      print(self.name, 'L3_test2_B', time.ctime())
  

  
      R.acquire()
  
      print(self.name, 'L4_test2_A', time.ctime())
  
      time.sleep(2)
  
      print(self.name, 'L4_test2_B', time.ctime())
  
      R.release()
  

  
      R.release()
  

  
    def run(self):
  
      self.Ltest()
  
      self.Ltest2()
  

  
li=[]
  
# A = threading.Lock()
  
# B = threading.Lock()
  

  
R=threading.RLock()   # 递归锁
  

  
if __name__ == '__main__':
  
    for i in range(2):
  
      f=Mythread()
  
      f.start()
  
      li.append(f)
  

  
    # 执行最后一个数值
  
    for x in li:
  
      x.join()
  

  
# 结果
  太长了,资源都是抢占方式运行,所以显示出来的信息是不按顺序打印的
  10.5 同步与异步
  同步 与 异步
  同步: 当进程执行到IO操作(等待外部数据)的时候, 如socket recv与send一发一收时,等。同步
  异步不等,一直等到数据接收成功,再回来处理
  信号量和同步对象   # 需了解的东西
  同步事件
  让两个线程之间处于一个同步状态
event.wait()# 等待 需要设置一个flag位,False就表示阻塞  
event.set()# 如果设置这个了 就表示为True
  
event.clear()# 将set设置为false
  

  
###### 例一
  
import threading
  
import time
  

  
class Boss(threading.Thread):
  
    def run(self):
  
      print('Boss: 项目急,大家在加油一下')
  
      print(events.isSet())
  
      # 创建事件对象,设置flag为True
  
      events.set()
  
      time.sleep(3)
  
      print(events.isSet())
  
      print('Boss: 下班,大家辛苦啦')
  
      events.set()
  
class Worker(threading.Thread):
  
    def run(self):
  
      # 默认是faluse,等待其它事件为true时执行
  
      events.wait()
  
      # 线程并发操作,沉睡时间取决于上一个线程的最大时间
  
      print('Worker: 回去又得好慢了,加油干')
  
      time.sleep(1)
  
      events.clear()
  
      events.wait()
  
      print('Worker: 下班咯')
  

  
Li_dist=[]
  
if __name__ == '__main__':
  
    events=threading.Event()    # 创建一个事件对象
  
    for L in range(5):
  
      Li_dist.append(Worker())
  
    Li_dist.append(Boss())
  
    for x in Li_dist:
  
      x.start()
  
    for J in Li_dist:       # 最后一个线程join
  
      J.join()
  

  
    print('结束循环')
  

  
# 结果
  Boss: 项目急,大家在加油一下
  False
  Worker: 回去又得好慢了,加油干*5
  Boss: 下班,大家辛苦啦
  Worker: 下班咯*5
  结束循环
  信号量:
  允许有多少线程同时运行
#### 例一  
import threading
  
from time import sleep
  
class Mythread(threading.Thread):
  
    def run(self):
  
      # 增加一把锁,同时3次
  
      sem.acquire()
  
      print(self.name)
  
      sleep(1)
  
      # 释放锁
  
      sem.release()
  

  
Li=[]
  
# 定义允许同时能有多少个线程能同时运行,类似电梯,一次只能多重
  
sem = threading.Semaphore(3)
  

  
if __name__ == '__main__':
  
    for i in range(20):
  
      Li.append(Mythread())
  
    for L in Li:
  
      L.start()
  

  
# 结果
  
# 每次打印三个
  队列------生产者消费者模型
  # 队列是用来解决多线程安全的,有多线程的出现才有队列
  队列优先级
  1、先进先出
  2、先进后出
  3、优先级
1、先进先出  ### 例一
  import queue    # 线程队列
  

  
# 首先先创建一个对象 比如列表Lis=[]
  
q=queue.Queue()
  

  
q.put(123)
  
q.put('asf')
  
q.put({'name':'xiong'})
  

  
while 1:
  
    data=q.get()
  
    print(data)
  
    print('-----')
  

  
# 结果
  
123
  
-----
  
asf
  
-----
  
{'name': 'xiong'}
  
-----
  
# 需要注意的是这里并没有打印完,它一直在这里阻塞,同步状态,一直等待下一个值的接收
  

  
# 例二:
  
q=queue.Queue(3)
  
#设置最大put值,当定义的put值,超过了定义的值的时候,会直接阻塞
  

  
# 例三:
  
q.put('adsf',False)
  
# 自已定义一个阻塞, 默认是True
  raise Full
  queue.Full
  
# 当定义的值超过例二定义的范围时,会报full错误
  

  
2、先进后出
  
q=queue.LifoQueue(3)
  
# 好比类似一个细长的水壶,往里放小石子,最前的就是最后倒出来的.
  

  

  
3、优先级
  
import queue
  
q=queue.PriorityQueue()
  
q.put()               #定义优先级,值越小优先级越高
  
q.put()
  
q.put()
  
while 1:
  
    data=q.get()
  
    print(data)
  
    print('-----')
  

  

  
  -----
  
  -----
  
  -----
  
# 最后这里打印的也是会一直阻塞,等待下一个值的输入
  

  

  
# 其它方法
  
import queue
  
q=queue.PriorityQueue()
  
q.put()
  
q.put()
  
q.put()
  
print('qsize: ',q.qsize())
  
print('是否full:',q.full())
  
print('是否为空:',q.empty())
  
while 1:
  
    data=q.get()
  
    print(data)
  
    print('-----')
  

  

  
# 结果
  
qsize:3       # 这里不是对象定义的值,而是根据put做出的判断,有多少个put就有多少个值
  
是否full: False # 没满
  
是否为空: False# 不是为空, 可以用于其它程序的判断,比如我只让你输入二个,你输入三个就报错
  

  
-----
  

  
-----
  

  
-----
  

  
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
  
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
  10.6 生产者-消费者模型
### 例一  
import queue
  
import time
  
import threading
  
# 生成一个线程对象
  
q=queue.Queue()
  

  
def Producer(name):
  
    count=0
  
    while count < 10:
  
      time.sleep(2)
  
      q.put(count)
  
      # q.task_done()       # 发送包子已经制作好
  
      print('Producer %s makeing %s baizi'%(name,count))
  
      q.join()
  
      print('realy make baizi.....')
  

  
      count+=1
  

  
def Consumer(name):
  
    count=0
  
    while count < 10:
  

  
            time.sleep(3)       # 等待
  
      # if not q.empty():# 判断如果是空 那么就应该显示没有包子
  
      #   print('wait........')
  
            # q.join()            # 等待生产者发送包子作好的消息
  
            # 消费者开始获取包子
  
            data=q.get()
  
            # 消费发送信息到队列中说包子已经开吃了, 生产者又继续开始制作下一个包子
  
            q.task_done()
  
            print('eat.....')
  
            print('Consumer %s eat %s baizi'%(name,data))
  
      # else:
  
      #   print('not baizi')
  
            count+=1
  

  
P1=threading.Thread(target=Producer,args=('A'))
  
C1=threading.Thread(target=Consumer,args=('B'))
  
C2=threading.Thread(target=Consumer,args=('C'))
  
C3=threading.Thread(target=Consumer,args=('D'))
  
P1.start()
  
C1.start()
  
C3.start()
  
C2.start()
  10.7 多进程模块
  多进程模块multiprocessing
  开启10进程以内可接受,原因python因为GIL一次只能运行一个线程,严重导致了多核CPU的使用
  # 例一
  # 导入多进程模块
import multiprocessing  
import os
  
def Mympro(name):
  
    print('name %s'%name)
  
    print('master ppid: ',os.getppid())
  
    print('slave pid: ',os.getpid())
  

  
if __name__ == '__main__':
  
    p=multiprocessing.Process(target=Mympro,args=('xiong',))# 严重注意的是: args传递参数需要有一个逗号 ,不然会报格式错误
  
    p.start()
  
    p.join()
  

  
# 结果
  
name xiong
  
master ppid:7648# 父进程id
  
slave pid:7656# 子进程id
  ### 例二 继承
from multiprocessing import Process  
import os
  
class Mympro(Process):
  
    def __init__(self,name):
  
      super(Mympro,self).__init__()
  
      # Process().__init__(self)
  
      self.name=name
  

  
    def run(self):
  
      print('name: ',self.name)
  
      print('master ppid: ',os.getppid())
  
      print('slave pid: ',os.getpid())
  

  
if __name__ == '__main__':
  
    M=Mympro('xiong')
  
    M.start()
  
    M.join()
  

  
# 结果
  
name:xiong
  
master ppid:3320
  
slave pid:7516
  # 进程
  实例方法:
  run(): start()调用run方法
  terminate(): 不管任务是否完成,立即停止工作过程
  属性
  daemon: 跟线程的setDaemon一样
  进程间通信:
  进程互相通信,共享通信 ,队列与管道只能实现数据交互,没有实现数据共享,manager可以用于数据之间共享
  1、进程队列Queue
  2、管道
  3、manager
  4、数据同步
  5、进程池
# 进程队列  
from multiprocessing import Process,Queue
  

  
# 继承多进程
  
class MultiP(Process):
  
    def __init__(self,q):
  
      super(MultiP,self).__init__()
  
      self.q = q
  
    # 重新构造run方法,start直接会调用这个
  
    def run(self):
  
      self.q.put('12321')
  
      print('son id: ',id(self.q))
  

  
if __name__ == '__main__':
  
    q=Queue()   # 等于multiprocessing.Queue
  
    # 创建的进程队列必须先传递到子进程,也就是先复制一份资源过去,然后才能进行其它的操作
  
    M=MultiP(q)
  
    M.start()
  
    print('main id: ',id(q))
  
    print('mian process: ',q.get())
  

  
# 结果
  
main id:39167816
  
son id:47022208
  
mian process:12321
  2、管道
from multiprocessing import Process,Pipe  
# 驼峰的还是比较漂亮的
  

  
def MultiP(conn):
  
    conn.send()
  
    data=conn.recv()
  
    print(data)
  
    print('son id: ',id(conn))
  
    conn.close()
  

  
if __name__ == '__main__':
  
    # 这个Pipe类似于socket的conn一样,前面是描述信息,后面是地址
  
    # 这里创建的是两个对象,主一个,子一个用于进程间传递信息
  
    main_conduit, son_conduit = Pipe()
  
    M=Process(target=MultiP,args=(son_conduit,))
  
    M.start()
  
    print(main_conduit.recv())
  
    main_conduit.send('11111')# 可以是字符串,列表,字典等
  
    M.join()
  
    print('main id: ', id(son_conduit))
  

  
# 结果
  

  
11111
  
son id:47170728
  
main id:43719144
  3、manager
from multiprocessing import Process,Manager  

  
def MultiP(Li,Di,i):
  
    Di['2']='2'
  
    Di='1'
  
    Li.append(i)
  

  
lis=[]
  
if __name__ == '__main__':
  
#   # manager=Manager()
  
    with Manager() as manager:
  
      Li=manager.list(range(5))
  
      Di=manager.dict()
  

  
      for i in range(5):
  
            # 必须先将定义的manager的字典传递到进进程才能操作,类似浅拷贝
  
            M=Process(target=MultiP,args=(Li,Di,i))
  
            M.start()
  
            lis.append(M)
  

  
      for x in lis:
  
            # 进程队列需要先运行子进程然后再运行父进程,否则会报错
  
            x.join()
  

  
      print(Di)
  
      print(Li)
  

  
# 结果
  {'2': '2', 0: '1', 1: '1', 2: '1', 4: '1', 3: '1'}
  
  4、数据同步
  当加了一把同步锁之后, 不管是进程还是线程 都是串行
from multiprocessing import Process,Lock  
from time import sleep
  

  
def MultiP(lock,nums):
  
    lock.acquire()
  
    sleep(1)
  
    print('main id: %s' %nums)
  
    lock.release()
  

  
if __name__ == '__main__':
  
    lock=Lock()
  
    # 启动10个进程
  
    li=[]
  
    for i in range(10):
  
      M=Process(target=MultiP,args=(lock,i))
  
      M.start()
  
      li.append(M)
  

  
    for L in li:
  
      L.join()
  
# 结果:数据同步是同步方式,这是为了保证数据安全,类似tcp操作
  5、进程池
  apply    同步
  apply_async   异步
  # 回调函数: 就是某个动作或者函数执行成功后,再去执行的函数
  好处: 主进程直接调用回调函数,节省内存资源,如日志
  回调函数接收的值来自于 子进程的 return值
from multiprocessing import Process,Pool  
from time import sleep
  
import os
  
def Foo(nums):
  
    sleep(1)
  
    print('son process:',os.getpid())
  
    print(nums)
  
    return 'Foo: %s'%nums
  

  
# 这里的args接受的值来自于Foo函数的return值
  
def Pro(args):
  
    print(args)
  

  
if __name__ == '__main__':
  
    # 定义最大进程池,同时能打开多少个进程,默认是机器的CPU核数
  
    pool=Pool(5)
  
    for i in range(20):
  
      # 异步进程池,callback回调函数,这里回调函数是由主线程运行的
  
      pool.apply_async(func=Foo,args=(i,),callback=Pro)
  
    # 这里的格式是固定的 close , join
  
    pool.close()
  
    pool.join()
  
    print('end')
  

  
# 打印结果:每次五个 首先先打印子进程内容,然后再打印回调函数,依次执行完进程,最后打印end
  6、协程    协作式, 非抢占式
  用户态: 通过用户自己进行操作的模式
  协程主要解决的也是IO操作的时候
  协程:本质上也是一个线程
  协程的优势:
  1、没有切换的消耗
  2、没有锁的概念 (本身就是一个线程,遇到IO就切换)
  缺点: 不能使用多核,(单线程)但可以采用多进程+协程, 一个很好的解决并发的方案
python自带的yield  
#### yield基础用法 例一:
  def Yi():
  print('ok')
  while True:
  x=yield
  print(x)
  

  y=Yi()
  next(y)
  y.send(1)# 发送一个值到yield x接受并打印,没有while就会直接打印出 StopIteration
  
#### 例二
  
def Yi():
  
    print('ok')
  
    while True:
  
      x=yield
  
      print('resv from Y2 value: %s'%x)
  

  
def Y2():
  
    next(y)
  
    n=0
  
    while n<=4:
  
      print('relay send Yi value %s'%(n))
  
      y.send(n)
  
      n+=2
  

  
if __name__ == '__main__':
  
    y=Yi()
  
    p=Y2()
  

  
# 结果
  
    ok# 使用next打印生成器中的值
  
    relay send Yi value 0# send发送值到Y1,yield接收并赋值给x,然后打印
  
    resv from Y2 value: 0# 使用的是同一个线程,所以没有切换的损耗
  
    relay send Yi value 2
  
    resv from Y2 value: 2
  
    relay send Yi value 4
  
    resv from Y2 value: 4
  greenlet
  switch()启动 切换
  缺点: 每一个需求都需要手动switch切换
  py3.6没有这个模块了
  gevent   重要
页: [1]
查看完整版本: python_day10_并发编程