Python将工作分布到多个进程
sys.getrefcount() 获取对象的当前引用计数

Python将工作分布到多个线程

erhuabushuo posted @ 2012年3月04日 03:50 in Python , 3323 阅读

使用threading

 

#!/usr/bin/env python3

import optparse
import os
import queue
import threading

BLOCK_SIZE = 8000

class Worker(threading.Thread):
    
    def __init__(self, work_queue, word, number):
        super().__init__()
        self.work_queue = work_queue
        self.word = word
        self.number = number 

    def run(self):
        while True:
            try:
                filename = self.work_queue.get()
                self.process(filename)
            finally:
                self.work_queue.task_done()
    
    def process(self, filename):
        previous = ""
        try:
            with open(filename, "rb") as fh:
                while True:
                    current = fh.read(BLOCK_SIZE)
                    if not current:
                        break
                    current = current.decode("utf8", "ignore")
                    if (self.word in current or
                        self.word in previous[-len(self.word):] +
                                     current[:len(self.word)]):
                        print("{0}{1}".format(self.number, filename))
                        break
                    if len(current) != BLOCK_SIZE:
                        break
                    previous = current
        except EnvironmentError as err:
            print("{0}{1}".format(self.number, err))

    
def main():
    opts, word, args = parse_options()
    filelist = get_files(args, opts.recurse)
    work_queue = queue.Queue()
    for i in range(opts.count):
        number = "{0}: ".format(i + 1) if opts.debug else ""
        worker = Worker(work_queue, word, number)
        worker.daemon = True
        worker.start()
    for filename in filelist:
        work_queue.put(filename)
    work_queue.join()
        

def parse_options():
    parser = optparse.OptionParser(
            usage=("usage: %prog [options] word name1 "
                   "[name2 [... nameN]\n\n"
                   "names are filenames or paths: paths only "
                   "make sense with the -r option set"))
    parser.add_option("-t", "--threads", dest="count", default=7,
            type="int",
            help=("the number of threads to use(1..20) "
                  "[default %default]"))
    parser.add_option("-r", "--recurse", dest="recurse",
            default=False, action="store_true",
            help="recurse into subdirectories")
    parser.add_option("-d", "--debug", dest="debug", default=False,
                    action="store_true")
    opts, args = parser.parse_args()
    if len(args) == 0:
        parser.error("a word and at least one path must be specified")
    elif len(args) == 1:
        parser.error("at least one path must be specified")
    if (not opts.recurse and
        not any([os.path.isfile(arg) for arg in args])):
        parser.add("at least one file must be specified: or use -r")
    if not (1 <= opts.count <=20):
        parser.error("thread count must be 1..20")
    return opts, args[0], args[1:]

def get_files(args, recurse):
    filelist = []
    for path in args:
        if os.path.isfile(path):
            filelist.append(path)
        elif recurse:
            for root, dirs, files in os.walk(path):
                for filename in files:
                    filelist.append(os.path.join(root, filename))
    return filelist

main()

 

使用multiprocessing

#!/usr/bin/env python3

import optparse
import os
import multiprocessing

BLOCK_SIZE = 8000

class Worker(multiprocessing.Process):
    
    def __init__(self, work_queue, word, number):
        super().__init__()
        self.work_queue = work_queue
        self.word = word
        self.number = number 

    def run(self):
        while True:
            try:
                filename = self.work_queue.get()
                self.process(filename)
            finally:
                self.work_queue.task_done()
    
    def process(self, filename):
        previous = ""
        try:
            with open(filename, "rb") as fh:
                while True:
                    current = fh.read(BLOCK_SIZE)
                    if not current:
                        break
                    current = current.decode("utf8", "ignore")
                    if (self.word in current or
                        self.word in previous[-len(self.word):] +
                                     current[:len(self.word)]):
                        print("{0}{1}".format(self.number, filename))
                        break
                    if len(current) != BLOCK_SIZE:
                        break
                    previous = current
        except EnvironmentError as err:
            print("{0}{1}".format(self.number, err))

    
def main():
    opts, word, args = parse_options()
    filelist = get_files(args, opts.recurse)
    work_queue = multiprocessing.JoinableQueue()
    for i in range(opts.count):
        number = "{0}: ".format(i + 1) if opts.debug else ""
        worker = Worker(work_queue, word, number)
        worker.daemon = True
        worker.start()
    for filename in filelist:
        work_queue.put(filename)
    work_queue.join()
        

def parse_options():
    parser = optparse.OptionParser(
            usage=("usage: %prog [options] word name1 "
                   "[name2 [... nameN]\n\n"
                   "names are filenames or paths: paths only "
                   "make sense with the -r option set"))
    parser.add_option("-t", "--threads", dest="count", default=7,
            type="int",
            help=("the number of threads to use(1..20) "
                  "[default %default]"))
    parser.add_option("-r", "--recurse", dest="recurse",
            default=False, action="store_true",
            help="recurse into subdirectories")
    parser.add_option("-d", "--debug", dest="debug", default=False,
                    action="store_true")
    opts, args = parser.parse_args()
    if len(args) == 0:
        parser.error("a word and at least one path must be specified")
    elif len(args) == 1:
        parser.error("at least one path must be specified")
    if (not opts.recurse and
        not any([os.path.isfile(arg) for arg in args])):
        parser.add("at least one file must be specified: or use -r")
    if not (1 <= opts.count <=20):
        parser.error("thread count must be 1..20")
    return opts, args[0], args[1:]

def get_files(args, recurse):
    filelist = []
    for path in args:
        if os.path.isfile(path):
            filelist.append(path)
        elif recurse:
            for root, dirs, files in os.walk(path):
                for filename in files:
                    filelist.append(os.path.join(root, filename))
    return filelist

main()

通过使用forking(在支持该机制的系统上,比如UNIX)或子进程(在那写不支持forking的系统上,比如Windows),multiprocessing模块可以提供线程类似的功能,因此,锁机制并不总是必须的,并且进程可以运行在操作系统支持的任何处理器核上。该包提供了几种在进程之间传递数据的方式,包括使用队列——可用于为进程提供工作载荷,就像queue.Queue可用于为线程提供工作载荷一样。

 

multiprocessing版本的主要好处是,在多核机器上,具有比线程化版本运行更快的潜力,因为这一版本可以在任何可用的处理器核上运行其进程。与标准的Python解释器(使用C编写,有时候称为CPython)相比,解释器有一个GIL(全局解释器锁),这意味着,在任何时刻上,只有一个线程可以执行Python代码。这一约束是一种实现上的细节,并不必然应用于其他Python解释器,比如Jython。


登录 *


loading captcha image...
(输入验证码)
or Ctrl+Enter