Python multiprocessing
Posted on 2012年5月14日 18:56简单例子
#!/usr/bin/env python3 from multiprocessing import Process import time import os def f(name): time.sleep(1) print("Hello", name) print(os.getppid()) print(os.getpid()) process_list = [] if __name__ == '__main__': for i in range(10): p = Process(target=f, args=(i,)) p.start() process_list.append(p) for item in process_list: item.join()
进程间通信:
Queue
#!/usr/bin/env python3 from multiprocessing import Process, Queue import time def f(name): time.sleep(1) q.put(['hello' + str(name)]) process_list = [] q = Queue() if __name__ == '__main__': for i in range(10): p = Process(target=f, args=(i,)) p.start() process_list.append(p) for j in process_list: j.join() for i in range(10): print(q.get())
Pipe管道
#!/usr/bin/env python3 from multiprocessing import Process, Pipe import time import os def f(conn, name): time.sleep(1) conn.send(['hello' + str(name)]) print(os.getppid(), '----------', os.getpid()) process_list = [] parent_conn, child_conn = Pipe() if __name__ == '__main__': for i in range(10): p = Process(target=f, args=(child_conn, i)) p.start() process_list.append(p) for j in process_list: j.join() for p in range(10): print(parent_conn.recv())
Pipe()返回两个连接类,代表两个方向。如果两个进程在管道的两边同时读或同时写,会有可能造成corruption
进程间同步
可以加一个锁,以使某一时刻只有一个进程print
#!/usr/bin/env python3 from multiprocessing import Process, Lock import time import os def f(name): lock.acquire() time.sleep(1) print("hello--" + str(name)) print(os.getppid(), '----------', os.getpid()) lock.release() process_list = [] lock = Lock() if __name__ == '__main__': for i in range(10): p = Process(target=f, args=(i, )) p.start() process_list.append(p) for j in process_list: j.join()
进程间共享状态 Sharing state between processes
当然尽大可能防止使用共享状态,但最终有可能会使用到。
1.共享内存
可以通过使用Value或Array把数据存储在一个共享的内存表中
#!/usr/bin/env python3 from multiprocessing import Process, Value, Array import time import os def f(n, a, name): time.sleep(1) n.value = name * name for i in range(len(a)): a[i] = -i process_list = [] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) for i in range(10): p = Process(target=f, args=(num, arr, i)) p.start() process_list.append(p) for j in process_list: j.join() print(num.value) print(list(arr))
'd'和'i'参数是num和arr用来设置类型,d表示一个双精浮点类型,i表示一个带符号的整型。
更加灵活的共享内存可以使用multiprocessing.sharectypes模块
Server Process
Manager()返回一个manager类型,控制一个server process,可以允许其它进程通过代理复制一些python objects
支持list,dict,Namespace,Lock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value,Array
#!/usr/bin/env python3 from multiprocessing import Process, Manager import time import os def f(d, name): time.sleep(1) d[name] = name * name print(d) process_list = [] if __name__ == '__main__': manager = Manager() d = manager.dict() for i in range(10): p = Process(target=f, args=(d, i)) p.start() process_list.append(p) for j in process_list: j.join() print(d)
Server process managers比共享内存方法更加的灵活,一个单独的manager可以被同一网络的不同计算机的多个进程共享。
比共享内存更加的缓慢
使用工作池 Using a pool of workers
Pool类代表 a pool of worker processes.
It has methods which allows tasks to be offloaded to the worker processes in a few different ways.