czhtr 发表于 2018-8-4 07:31:35

python多进程—multiprocessing

  一、进程
  python中提供多进程包:multiprocessing,支持子进程,通信,共享内存,执行不同形式的同步,提供了Process、Pipi、Lock等组件
  多进程和多线程区别:
  多线程使用的是CPU的一个核,适合IO密集型
  多进程使用的是CPU的多个核,适合运算密集型
  1)multiprocessing的方法
  cpu_count():统计cpu总数
  active_children():获取所有子进程
  例子:
#!/usr/bin/env python  
import multiprocessing
  

  
p = multiprocessing.cpu_count()
  
m = multiprocessing.active_children()
  

  
print(p)
  
print(m)
  运行结果:
  8
  []
  2)Process进程
  创建一个Process对象:p = multiprocessing.Precess(target=worker,args=(2,))
  说明:
  target = 函数名字
  args = 函数需要的的参数,以tuple形式传入
  3)Process常用方法
  is_alive():判断进程是否存活
  run():启动进程
  start():启动进程,会自动调用run方法,常用
  join(timeout=):等待进程结束或者直到超时
  4)Process常用属性
  name:进程名字
  pid:进程的pid
  例子:
#!/usr/bin/env python  
import time
  
import multiprocessing
  

  
def worker(interval):
  
    time.sleep(interval)
  
    print("hello,China")
  

  
if __name__ == "__main__":
  
    p = multiprocessing.Process(target=worker,args=(5,))
  

  
    p.start()
  

  
    print(p.is_alive())
  
    p.join(timeout=3)# 只等待3秒,如果进程还没结束,则向下执行print(p.name)
  

  
    print(p.name)
  
    print(p.pid)
  
    print("This is end")
  运行结果:
  True
  Process-1
  121764
  This is end
  hello,China
  实例: 多进程
import time  
import multiprocessing
  

  
def worker(name,interval):
  
    print("{0} start" .format(name))
  
    time.sleep(interval)
  
    print("{0} end" .format(name))
  

  
if __name__ == "__main__":
  
    print("main start")
  
    print("The computer has {0} core" .format(multiprocessing.cpu_count()))
  

  
    p1 = multiprocessing.Process(target=worker,args=("worker",2))
  
    p2 = multiprocessing.Process(target=worker,args=("worker",3))
  
    p3 = multiprocessing.Process(target=worker,args=("worker",4))
  

  
    p1.start()
  
    p2.start()
  
    p3.start()
  

  
    for p inmultiprocessing.active_children():
  
      print("The pid of {0} is {1}" .format(p.name,p.pid) )
  
    print("main end")
  运行结果:
  main start
  The computer has 4 core
  The pid of Process-1 is 21112
  The pid of Process-3 is 20536
  The pid of Process-2 is 2116
  main end
  worker start
  worker start
  worker start
  worker end
  worker end
  worker end
  说明:启动的多个进程之间都是相互独立存在的
  二、lock组件
  当我们用多进程来读写文件时,如果一个写一个读同时进行时不行的,必须一个写完之后,另一个才可以读。因此需要用到一个锁机制进行控制
  实例1:多进程不加锁
import multiprocessing  
import time
  

  
def add(number,value,lock):
  
    print("init,member = {1}".format(value,number))
  
    for i in xrange(1,6):
  
      number += value
  
      time.sleep(1)
  
      print("add {0},number = {1}".format(value,number))
  

  
if __name__ == "__main__":
  
    lock = multiprocessing.Lock()
  
    number = 0
  

  
    p1 = multiprocessing.Process(target=add,args=(number,1,lock))
  
    p3 = multiprocessing.Process(target=add,args=(number,3,lock))
  

  
    p1.start()
  
    p3.start()
  

  
    print("main end")
  运行结果:
  main end
  init,member = 0
  init,member = 0
  add 1,number = 1
  add 3,number = 3
  add 1,number = 2
  add 3,number = 6
  add 1,number = 3
  add 3,number = 9
  add 1,number = 4
  add 3,number = 12
  add 1,number = 5
  add 3,number = 15
  说明:多进程互不干扰,同时进行;进程1: 0、1、2、3、4、5;进程3: 0、3、6、9、15
  实例2:多进程加锁
import multiprocessing  
import time
  

  
def add(number,value,lock):
  
    lock.acquire()      # 获取锁
  
    try:
  
      print("init,member = {1}".format(value,number))
  
      for i in xrange(1,6):
  
            number += value
  
            time.sleep(1)
  
            print("add {0},number = {1}".format(value,number))
  
    except Exception as e:
  
      raise e
  
    finally:
  
      lock.release()       # 释放锁
  

  
if __name__ == "__main__":
  
    lock = multiprocessing.Lock()# 定义锁
  
    number = 0
  

  
    p1 = multiprocessing.Process(target=add,args=(number,1,lock))
  
    p3 = multiprocessing.Process(target=add,args=(number,3,lock))
  

  
    p1.start()
  
    p3.start()
  

  
    print("main end")
  运行结果:
  main end
  init,member = 0
  add 1,number = 1
  add 1,number = 2
  add 1,number = 3
  add 1,number = 4
  add 1,number = 5
  init,member = 0
  add 3,number = 3
  add 3,number = 6
  add 3,number = 9
  add 3,number = 12
  add 3,number = 15
  说明:进程1和进程3,谁先抢到锁,则另一个进程只能等待抢到者执行完之后,才能执行
  三、共享内存
  两个“同时“读写的文件,其中一个作用的结果对另外一个有影响。multiprocessing提供了Value和Array模块
  实例1:多进程内存共享不加锁
import multiprocessing  
import time
  

  
def add(number,value1):
  
    try:
  
      print("init,member = {1}".format(value1,number.value))   # 值的调用方式 number.value
  
      for i in xrange(1,6):
  
            number.value += value1
  
            time.sleep(1)
  
            print("add {0},number = {1}".format(value1,number.value))
  
    except Exception as e:
  
      raise e
  

  
if __name__ == "__main__":
  
    lock = multiprocessing.Lock()
  
    number = multiprocessing.Value("i",0)   # Value共享内存模块
  

  
    p1 = multiprocessing.Process(target=add,args=(number,1))
  
    p3 = multiprocessing.Process(target=add,args=(number,3))
  

  
    p1.start()
  
    p3.start()
  

  
    print("main end")
  运行结果:
  main end
  init,member = 0
  init,member = 1
  add 1,number = 4
  add 3,number = 5
  add 1,number = 8
  add 3,number = 9
  add 1,number = 12
  add 3,number = 13
  add 1,number = 16
  add 3,number = 17
  add 1,number = 20
  add 3,number = 20
  说明:不加锁,进程1和进程3在彼此运算完之后的结果上继续运算,同时进行
  实例2: 多进程共享内存加锁
import multiprocessing  
import time
  

  
def add(number,value1,lock):
  
    lock.acquire()
  
    try:
  
      print("init,member = {1}".format(value1,number.value))
  
      for i in xrange(1,6):
  
            number.value += value1
  
            time.sleep(1)
  
            print("add {0},number = {1}".format(value1,number.value))
  
    except Exception as e:
  
      raise e
  
    finally:
  
      lock.release()
  

  
if __name__ == "__main__":
  
    lock = multiprocessing.Lock()
  
    number = multiprocessing.Value("i",0)
  

  
    p1 = multiprocessing.Process(target=add,args=(number,1,lock))
  
    p3 = multiprocessing.Process(target=add,args=(number,3,lock))
  

  
    p1.start()
  
    p3.start()
  

  
    print("main end")
  运行结果:
  main end
  init,member = 0
  add 1,number = 1
  add 1,number = 2
  add 1,number = 3
  add 1,number = 4
  add 1,number = 5
  init,member = 5
  add 3,number = 8
  add 3,number = 11
  add 3,number = 14
  add 3,number = 17
  add 3,number = 20
  说明:加锁,进程3等待进程1执行完毕之后,在前者的结果上,继续执行
  四、多进程Manager
  一般实现的数据共享的方式只有两种结构Value和Array。Python中提供了强大的Manage专门用来做数据共享的,其支持的类型非常多,包括,Value, Array,list,dict, Queue, Lock等
  例:支持字典和列表类型
import multiprocessing  
def worker(d,l):
  
    l += range(11,16)   # 返回一个列表序列的特殊写法
  
    for i in xrange(1,6):
  
      key = "key {0}".format(i)
  
      value = "value {0}".format(i)
  
      d = value
  

  
if __name__ == "__main__":
  
    manager = multiprocessing.Manager()
  
    l = manager.list()
  
    d = manager.dict()
  
    p = multiprocessing.Process(target=worker,args=(d,l))
  
    p.start()
  
    p.join()
  

  
    print(d)
  
    print(l)
  
    print("main end")
  运行结果:
  {'key 1': 'value 1', 'key 2': 'value 2', 'key 3': 'value 3', 'key 4': 'value 4', 'key 5': 'value 5'}
  
  main end
  
  五、进程池
  Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程
  阻塞和非阻塞
  Pool.apply_async: 非阻塞,定义的进程池进程最大数可以同时执行
  Pool.apply:阻塞,一个进程结束,释放回进程池,下一个进程才可以开始
  例1:非阻塞
import multiprocessing  
import time
  
def worker(msg):
  
    print("###### start {0}#######".format(msg))
  
    time.sleep(1)
  
    print("###### end {0}#######".format(msg))
  

  
if __name__ == "__main__":
  
    print("main start")
  
    pool = multiprocessing.Pool(processes=3)
  

  
    for i in xrange(1,10):
  
      msg = "hello {0}".format(i)
  
      pool.apply_async(func=worker,args=(msg,))# pool.apply_async()非阻塞型
  

  
    pool.close()
  
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后如果没有新的进程加入到pool,则join函数等待所有子进程结束
  
    print("main end")
  运行结果:
  main start
  ###### start hello 1#######
  ###### start hello 2#######
  ###### start hello 3#######
  ###### end hello 1#######
  ###### start hello 4#######
  ###### end hello 2#######
  ###### start hello 5#######
  ###### end hello 3#######
  ###### start hello 6#######
  ###### end hello 4#######
  ###### start hello 7#######
  ###### end hello 5#######
  ###### start hello 8#######
  ###### end hello 6#######
  ###### start hello 9#######
  ###### end hello 7#######
  ###### end hello 8#######
  ###### end hello 9#######
  main end
  说明:一开始启动3个进程,之后先关闭一个进程,再补充另外一个进程进来,始终保持3个,直至结束
  例2:阻塞型
import multiprocessing  
import time
  
def worker(msg):
  
    print("###### start {0}#######".format(msg))
  
    time.sleep(1)
  
    print("###### end {0}#######".format(msg))
  

  
if __name__ == "__main__":
  
    print("main start")
  
    pool = multiprocessing.Pool(processes=3)
  

  
    for i in xrange(1,10):
  
      msg = "hello {0}".format(i)
  
      pool.apply(func=worker,args=(msg,))   # pool.apply() 阻塞型
  

  
    pool.close()
  
    pool.join()
  
    print("main end")
  运行结果:
  main start
  ###### start hello 1#######
  ###### end hello 1#######
  ###### start hello 2#######
  ###### end hello 2#######
  ###### start hello 3#######
  ###### end hello 3#######
  ###### start hello 4#######
  ###### end hello 4#######
  ###### start hello 5#######
  ###### end hello 5#######
  ###### start hello 6#######
  ###### end hello 6#######
  ###### start hello 7#######
  ###### end hello 7#######
  ###### start hello 8#######
  ###### end hello 8#######
  ###### start hello 9#######
  ###### end hello 9#######
  main end
  说明:每次只能启动一个进程,启动新进程前,需关闭老进程
页: [1]
查看完整版本: python多进程—multiprocessing