Python将工作分布到多个线程
erhuabushuo
posted @ 2012年3月04日 03:50
in Python
, 3323 阅读
使用threading
#!/usr/bin/env python3 import optparse import os import queue import threading BLOCK_SIZE = 8000 class Worker(threading.Thread): def __init__(self, work_queue, word, number): super().__init__() self.work_queue = work_queue self.word = word self.number = number def run(self): while True: try: filename = self.work_queue.get() self.process(filename) finally: self.work_queue.task_done() def process(self, filename): previous = "" try: with open(filename, "rb") as fh: while True: current = fh.read(BLOCK_SIZE) if not current: break current = current.decode("utf8", "ignore") if (self.word in current or self.word in previous[-len(self.word):] + current[:len(self.word)]): print("{0}{1}".format(self.number, filename)) break if len(current) != BLOCK_SIZE: break previous = current except EnvironmentError as err: print("{0}{1}".format(self.number, err)) def main(): opts, word, args = parse_options() filelist = get_files(args, opts.recurse) work_queue = queue.Queue() for i in range(opts.count): number = "{0}: ".format(i + 1) if opts.debug else "" worker = Worker(work_queue, word, number) worker.daemon = True worker.start() for filename in filelist: work_queue.put(filename) work_queue.join() def parse_options(): parser = optparse.OptionParser( usage=("usage: %prog [options] word name1 " "[name2 [... nameN]\n\n" "names are filenames or paths: paths only " "make sense with the -r option set")) parser.add_option("-t", "--threads", dest="count", default=7, type="int", help=("the number of threads to use(1..20) " "[default %default]")) parser.add_option("-r", "--recurse", dest="recurse", default=False, action="store_true", help="recurse into subdirectories") parser.add_option("-d", "--debug", dest="debug", default=False, action="store_true") opts, args = parser.parse_args() if len(args) == 0: parser.error("a word and at least one path must be specified") elif len(args) == 1: parser.error("at least one path must be specified") if (not opts.recurse and not any([os.path.isfile(arg) for arg in args])): parser.add("at least one file must be specified: or use -r") if not (1 <= opts.count <=20): parser.error("thread count must be 1..20") return opts, args[0], args[1:] def get_files(args, recurse): filelist = [] for path in args: if os.path.isfile(path): filelist.append(path) elif recurse: for root, dirs, files in os.walk(path): for filename in files: filelist.append(os.path.join(root, filename)) return filelist main()
使用multiprocessing
#!/usr/bin/env python3 import optparse import os import multiprocessing BLOCK_SIZE = 8000 class Worker(multiprocessing.Process): def __init__(self, work_queue, word, number): super().__init__() self.work_queue = work_queue self.word = word self.number = number def run(self): while True: try: filename = self.work_queue.get() self.process(filename) finally: self.work_queue.task_done() def process(self, filename): previous = "" try: with open(filename, "rb") as fh: while True: current = fh.read(BLOCK_SIZE) if not current: break current = current.decode("utf8", "ignore") if (self.word in current or self.word in previous[-len(self.word):] + current[:len(self.word)]): print("{0}{1}".format(self.number, filename)) break if len(current) != BLOCK_SIZE: break previous = current except EnvironmentError as err: print("{0}{1}".format(self.number, err)) def main(): opts, word, args = parse_options() filelist = get_files(args, opts.recurse) work_queue = multiprocessing.JoinableQueue() for i in range(opts.count): number = "{0}: ".format(i + 1) if opts.debug else "" worker = Worker(work_queue, word, number) worker.daemon = True worker.start() for filename in filelist: work_queue.put(filename) work_queue.join() def parse_options(): parser = optparse.OptionParser( usage=("usage: %prog [options] word name1 " "[name2 [... nameN]\n\n" "names are filenames or paths: paths only " "make sense with the -r option set")) parser.add_option("-t", "--threads", dest="count", default=7, type="int", help=("the number of threads to use(1..20) " "[default %default]")) parser.add_option("-r", "--recurse", dest="recurse", default=False, action="store_true", help="recurse into subdirectories") parser.add_option("-d", "--debug", dest="debug", default=False, action="store_true") opts, args = parser.parse_args() if len(args) == 0: parser.error("a word and at least one path must be specified") elif len(args) == 1: parser.error("at least one path must be specified") if (not opts.recurse and not any([os.path.isfile(arg) for arg in args])): parser.add("at least one file must be specified: or use -r") if not (1 <= opts.count <=20): parser.error("thread count must be 1..20") return opts, args[0], args[1:] def get_files(args, recurse): filelist = [] for path in args: if os.path.isfile(path): filelist.append(path) elif recurse: for root, dirs, files in os.walk(path): for filename in files: filelist.append(os.path.join(root, filename)) return filelist main()
通过使用forking(在支持该机制的系统上,比如UNIX)或子进程(在那写不支持forking的系统上,比如Windows),multiprocessing模块可以提供线程类似的功能,因此,锁机制并不总是必须的,并且进程可以运行在操作系统支持的任何处理器核上。该包提供了几种在进程之间传递数据的方式,包括使用队列——可用于为进程提供工作载荷,就像queue.Queue可用于为线程提供工作载荷一样。
multiprocessing版本的主要好处是,在多核机器上,具有比线程化版本运行更快的潜力,因为这一版本可以在任何可用的处理器核上运行其进程。与标准的Python解释器(使用C编写,有时候称为CPython)相比,解释器有一个GIL(全局解释器锁),这意味着,在任何时刻上,只有一个线程可以执行Python代码。这一约束是一种实现上的细节,并不必然应用于其他Python解释器,比如Jython。