使用threading
#!/usr/bin/env python3
import optparse
import os
import queue
import threading
BLOCK_SIZE = 8000
class Worker(threading.Thread):
def __init__(self, work_queue, word, number):
super().__init__()
self.work_queue = work_queue
self.word = word
self.number = number
def run(self):
while True:
try:
filename = self.work_queue.get()
self.process(filename)
finally:
self.work_queue.task_done()
def process(self, filename):
previous = ""
try:
with open(filename, "rb") as fh:
while True:
current = fh.read(BLOCK_SIZE)
if not current:
break
current = current.decode("utf8", "ignore")
if (self.word in current or
self.word in previous[-len(self.word):] +
current[:len(self.word)]):
print("{0}{1}".format(self.number, filename))
break
if len(current) != BLOCK_SIZE:
break
previous = current
except EnvironmentError as err:
print("{0}{1}".format(self.number, err))
def main():
opts, word, args = parse_options()
filelist = get_files(args, opts.recurse)
work_queue = queue.Queue()
for i in range(opts.count):
number = "{0}: ".format(i + 1) if opts.debug else ""
worker = Worker(work_queue, word, number)
worker.daemon = True
worker.start()
for filename in filelist:
work_queue.put(filename)
work_queue.join()
def parse_options():
parser = optparse.OptionParser(
usage=("usage: %prog [options] word name1 "
"[name2 [... nameN]\n\n"
"names are filenames or paths: paths only "
"make sense with the -r option set"))
parser.add_option("-t", "--threads", dest="count", default=7,
type="int",
help=("the number of threads to use(1..20) "
"[default %default]"))
parser.add_option("-r", "--recurse", dest="recurse",
default=False, action="store_true",
help="recurse into subdirectories")
parser.add_option("-d", "--debug", dest="debug", default=False,
action="store_true")
opts, args = parser.parse_args()
if len(args) == 0:
parser.error("a word and at least one path must be specified")
elif len(args) == 1:
parser.error("at least one path must be specified")
if (not opts.recurse and
not any([os.path.isfile(arg) for arg in args])):
parser.add("at least one file must be specified: or use -r")
if not (1 <= opts.count <=20):
parser.error("thread count must be 1..20")
return opts, args[0], args[1:]
def get_files(args, recurse):
filelist = []
for path in args:
if os.path.isfile(path):
filelist.append(path)
elif recurse:
for root, dirs, files in os.walk(path):
for filename in files:
filelist.append(os.path.join(root, filename))
return filelist
main()
使用multiprocessing
#!/usr/bin/env python3
import optparse
import os
import multiprocessing
BLOCK_SIZE = 8000
class Worker(multiprocessing.Process):
def __init__(self, work_queue, word, number):
super().__init__()
self.work_queue = work_queue
self.word = word
self.number = number
def run(self):
while True:
try:
filename = self.work_queue.get()
self.process(filename)
finally:
self.work_queue.task_done()
def process(self, filename):
previous = ""
try:
with open(filename, "rb") as fh:
while True:
current = fh.read(BLOCK_SIZE)
if not current:
break
current = current.decode("utf8", "ignore")
if (self.word in current or
self.word in previous[-len(self.word):] +
current[:len(self.word)]):
print("{0}{1}".format(self.number, filename))
break
if len(current) != BLOCK_SIZE:
break
previous = current
except EnvironmentError as err:
print("{0}{1}".format(self.number, err))
def main():
opts, word, args = parse_options()
filelist = get_files(args, opts.recurse)
work_queue = multiprocessing.JoinableQueue()
for i in range(opts.count):
number = "{0}: ".format(i + 1) if opts.debug else ""
worker = Worker(work_queue, word, number)
worker.daemon = True
worker.start()
for filename in filelist:
work_queue.put(filename)
work_queue.join()
def parse_options():
parser = optparse.OptionParser(
usage=("usage: %prog [options] word name1 "
"[name2 [... nameN]\n\n"
"names are filenames or paths: paths only "
"make sense with the -r option set"))
parser.add_option("-t", "--threads", dest="count", default=7,
type="int",
help=("the number of threads to use(1..20) "
"[default %default]"))
parser.add_option("-r", "--recurse", dest="recurse",
default=False, action="store_true",
help="recurse into subdirectories")
parser.add_option("-d", "--debug", dest="debug", default=False,
action="store_true")
opts, args = parser.parse_args()
if len(args) == 0:
parser.error("a word and at least one path must be specified")
elif len(args) == 1:
parser.error("at least one path must be specified")
if (not opts.recurse and
not any([os.path.isfile(arg) for arg in args])):
parser.add("at least one file must be specified: or use -r")
if not (1 <= opts.count <=20):
parser.error("thread count must be 1..20")
return opts, args[0], args[1:]
def get_files(args, recurse):
filelist = []
for path in args:
if os.path.isfile(path):
filelist.append(path)
elif recurse:
for root, dirs, files in os.walk(path):
for filename in files:
filelist.append(os.path.join(root, filename))
return filelist
main()
通过使用forking(在支持该机制的系统上,比如UNIX)或子进程(在那写不支持forking的系统上,比如Windows),multiprocessing模块可以提供线程类似的功能,因此,锁机制并不总是必须的,并且进程可以运行在操作系统支持的任何处理器核上。该包提供了几种在进程之间传递数据的方式,包括使用队列——可用于为进程提供工作载荷,就像queue.Queue可用于为线程提供工作载荷一样。
multiprocessing版本的主要好处是,在多核机器上,具有比线程化版本运行更快的潜力,因为这一版本可以在任何可用的处理器核上运行其进程。与标准的Python解释器(使用C编写,有时候称为CPython)相比,解释器有一个GIL(全局解释器锁),这意味着,在任何时刻上,只有一个线程可以执行Python代码。这一约束是一种实现上的细节,并不必然应用于其他Python解释器,比如Jython。