Python multiprocessing 使用手记[3] – 关于Queue
继续讨论Python multiprocessing,这次讨论的主要内容是mp库的核心组件之一的Queue。
Queue是mp库当中用来提供多进程对象交换的方式。对象交换和上一部分当中提到的对象共享都是使多个进程访问同一个对象的方式,两者的区别就是,对象共享是多个进程访问同一个对象,对象交换则是将对象从一个进程传输的另一个进程。
multiprocessing当中的Queue使用方式和Python内置的threading.Queue对象很像,它支持一个put操作,将 对象放入Queue,也支持一个get操作,将对象从Queue当中读出。和threading.Queue不同的是,mp.Queue默认不支持 join()和task_done操作,这两个支持需要使用mp.JoinableQueue对象。
由于Queue对象负责进程之间的对象传输,因此第一个问题就是如何在两个进程之间共享这个Queue对象本身。在上一部分所言的三种共享方式当 中,Queue对象只能使用继承(inheritance)的方式共享。这是因为Queue本身基于unix的Pipe对象实现,而Pipe对象的共享需 要通过继承。因此,在一个典型的应用实现模型当中,应该是父进程创建Queue,然后创建子进程共享该Queue,由父进程和子进程分别读写。例如下面的 这个例子:
import multiprocessing q = multiprocessing.Queue() def reader_proc(): print q.get() reader = multiprocessing.Process(target=reader_proc) reader.start() q.put(100) reader.join()
另一种实现方式是父进程创建Queue,创建多个子进程,有的子进程读Queue,有的子进程写Queue,例如:
import multiprocessing q = multiprocessing.Queue() def writer_proc(): q.put(100) def reader_proc(): print q.get() reader = multiprocessing.Process(target=reader_proc) reader.start() writer = multiprocessing.Process(target=writer_proc) writer.start() reader.join() writer.join()
由于使用继承的方式共享Queue,因此代码当中并没有明显的传输Queue对象本身的代码,看起来似乎只要将multiprocessing当中 的对象换成threading当中的对象,程序仍然能够工作。反之,拿到一个现有的多线程程序,是不是将threading改成 multiprocessing就可以工作呢?也许可以,但是更可能的情况是你会遇到很多问题。
第一个问题就是mp的Queue需要考虑多进程之间的对象传输,因此所传输的对象必须是可以pickle的。否则,在Queue的put操作上会抛出PicklingError。
其他的一些差异表现在一些技术细节上,这些不是任何高层逻辑可以抽象掉的,不知道这些差异会导致一些潜在的错误,例如死锁。在总结这些潜在的犯错的 可能的同时,我们会简单看一下mp当中Queue的实现方式,以便能够方便的理解为什么会有这样的行为。这些实现问题仅仅针对Linux,Windows 上面的实现和出现的问题在这里不涉及。
mp.Queue建构在系统的Pipe之上,但是实际上进程并不是直接将对象写入到Pipe里面,而是先写入一个本地的buffer,再由一个专门 的feed线程将其放入Pipe当中。读取端则是直接从Pipe当中读出对象。之所以有这样一个feed线程,是为了能够提供Queue接口函数所需要的 put的超时控制。但是由于这个feed线程的存在,mp.Queue提供了几个额外的函数来控制它,一个函数close来停止该线程,以及 join_thread来join该线程。close同时负责把所有在buffer当中的对象刷新到Pipe当中。
但是这个feed线程也是个麻烦制造者,为了保证所有被放入Queue的东西最终都能够到达另外一端的进程,mp库注册了一个atexit的处理函 数,用来在进程退出的时候自动close并且join该feed线程。这个join动作带来了很多问题,比如潜在的死锁。考虑下面一种状况:一个父进程创 建了两个子进程,一个子进程读,另一个子进程写。当需要停止这些进程的时候,父进程如果先把读进程结束,但是同时写进程已经将太多的对象写入Queue, 导致后继的对象等待在buffer当中,则这个进程将无法终止,因为atexit的处理函数等待把所有buffer当中的对象放入Pipe,但是Pipe 已经满了,然后陷入了死锁。
有人可能会问,那只要保证总是按照数据流的顺序来停止进程不就行。问题是在很多复杂的系统流程当中,可能存在一个环形的数据流,这种情况下,无论按照什么顺序停止进程,终究有一个进程可能陷入这种情景当中。
幸运的是,Queue对象还提供了一个成员函数cancel_join_thread,这个函数可以使得在进程停止的时候不进行join操作,这样 可以避免死锁,代价就是这个时候尚未刷新到Pipe当中的对象都会丢失。鉴于即使调用了join_thread,残留在Pipe当中的对象仍然可能丢失, 所以一旦选择使用mp的Queue对象,就不要假设不会在流程当中丢对象了。
另外一个可能的方案是使用mp库当中的SimpleQueue对象。这个对象在文档当中没有提及,但是在multiprocessing.queue模块当中有定义。这个对象就是去掉了buffer的Queue对象,因此可能能够避免上面说的问题的。但是SimpleQueue没有提供put和get的超时处理,两个动作都是阻塞的。
除了使用multiprocessing.Queue,还可以使用multiprocessing.Pipe进行通信。mp.Pipe是Queue 的底层结构,但是没有feed线程和put/get的超时控制。一定程度上和SimpleQueue很像。需要注意的是Pipe带有一个参数 duplex,当设置为True(默认)的时候,Pipe并不是使用系统的pipe来实现,而是通过socketpair,即Unix Domain Socket来实现。这个和pipe相比有些微的性能差异。
另外一个使用Queue的方式不是mp库内置的。这种方式使用上一篇文章当中提到的server process的方式来共享一个Queue对象。这个Queue对象实际上在server process当中,所有的子进程通过socket连接到server process获取该Queue的代理对象进行操作。说到这有人会想起来mp库有一个内置的SyncManager对象, 可以通过multiprocess.Manager函数获取到,通过该对象的Queue方法可以获取一个Queue的代理对象。不幸的是,这个方法不是正 确的获取Queue的方式,原因正如上一篇文章所说,SyncManager.Queue方法的每次调用获取到的是一个新建对象的代理对象,而不是一个共 享对象。正确的使用server process当中的Queue的方式是:
共同部分:
import multiprocessing.managers as mpm import Queue class SharedQueueManager(mpm.BaseManager): pass q = Queue.Queue() SharedQueueManager.register('Queue', lambda: q)
服务进程:
mgr = SharedQueueManager(address=('', 12345)) server = mgr.get_server() server.serve_forever()
客户进程:
mgr = SharedQueueManager(address=('localhost', 12345)) mgr.connect() q = mgr.Queue() # 这里q就是共享的Queue对象的代理对象
这种方式比起mp库内置的Queue,有一些性能上的影响,因为毕竟牵涉到多次网络通讯,但是带来的好处是没有feed线程带来的一系列问题,而且 理论上不会存在丢数据的问题,除非server process崩溃。但是正如上一篇所说,server process本身就不是很靠谱的,因此这里也只是“理论上”不会丢数据而已。
说到性能,这里就列两个性能数据,以前在twitter上面提到过的(这两个连接无法访问的请联系我):
操作对象为 pickle后512字节的对象,通过proxy操作Queue的性能大约是7000次/秒(本机)或1100次/秒(多机),如果使用 multiprocessing.Queue,效率可达54000次/秒。