деление и завоевание etree.iterparse с использованием многопроцессорной обработки - PullRequest
0 голосов
/ 22 января 2011

Итак, давайте представим большой XML-документ (размер файла> 100 МБ), который мы хотим разбить с помощью cElementTree.iterparse.

но все те ядра, которые Intel обещал нам, будут полезны, как мы их используем? вот что я хочу:

from itertools import islice
from xml.etree import ElementTree as etree

tree_iter = etree.iterparse(open("large_file.xml", encoding="utf-8"))

first = islice(tree_iter, 0, 10000)
second = islice(tree_iter, 10000)

parse_first()
parse_second()

Кажется, есть несколько проблем с этим, не в последнюю очередь то, что итератор, возвращаемый iterparse (), кажется, сопротивляется нарезке.

Есть ли способ разделить рабочую нагрузку при разборе большого XML-документа на две или четыре отдельные задачи (без загрузки всего документа в память?) С целью выполнения задач на отдельных процессорах.

1 Ответ

0 голосов
/ 22 января 2011

Я думаю, вам нужен хороший пул потоков с очередью задач для этого.Я нашел (и использую) этот очень хороший (он есть в python3, но его не должно быть слишком сложно конвертировать в 2.x):

# http://code.activestate.com/recipes/577187-python-thread-pool/

from queue import Queue
from threading import Thread

class Worker(Thread):
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try: func(*args, **kargs)
            except Exception as exception: print(exception)
            self.tasks.task_done()

class ThreadPool:
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads): Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        self.tasks.join()

Теперь вы можете просто запустить цикл на iterparse ипусть пул потоков разделит работу за вас.Используя это просто, как это:

def executetask(arg):
    print(arg)

workers = threadpool.ThreadPool(4) # 4 is the number of threads
for i in range(100): workers.add_task(executetask, i)

workers.wait_completion() # not needed, only if you need to be certain all work is done before continuing
...