Как я могу обрабатывать XML асинхронно в Python? - PullRequest
6 голосов
/ 19 января 2010

У меня есть большой файл данных XML (> 160 МБ) для обработки, и похоже, что разбор SAX / expat / pulldom - это путь. Я хотел бы иметь поток, который просеивает через узлы и помещает узлы для обработки в очередь, а затем другие рабочие потоки вытягивают следующий доступный узел из очереди и обрабатывают его.

У меня есть следующее (оно должно иметь блокировки, я знаю, что оно будет позже)

import sys, time
import xml.parsers.expat
import threading

q = []

def start_handler(name, attrs):
    q.append(name)

def do_expat():
    p = xml.parsers.expat.ParserCreate()
    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")


t = threading.Thread(group=None, target=do_expat)
t.start()

while True:
    print(q)
    time.sleep(1)

Проблема в том, что тело блока while вызывается только один раз, и тогда я не могу даже ctrl-C прервать его. В меньших файлах вывод такой же, как и ожидалось, но, похоже, это указывает на то, что обработчик вызывается только тогда, когда документ полностью проанализирован, что, по-видимому, противоречит цели парсера SAX.

Я уверен, что это мое собственное невежество, но я не понимаю, где я совершаю ошибку.

PS: я также пытался изменить start_handler таким образом:

def start_handler(name, attrs):
    def app():
        q.append(name)
    u = threading.Thread(group=None, target=app)
    u.start()

Хотя любви нет.

Ответы [ 4 ]

7 голосов
/ 19 января 2010

ParseFile, как вы заметили, просто "глотает" все - ничего хорошего для инкрементального анализа, который вы хотите сделать!Итак, просто передавайте файл в анализатор поочередно, следя за тем, чтобы по ходу вы обеспечивали контроль над другими потоками - например:

while True:
  data = f.read(BUFSIZE)
  if not data:
    p.Parse('', True)
    break
  p.Parse(data, False)
  time.sleep(0.0)

вызов time.sleep(0.0) - это способ сказать Python"уступить другим потокам, если они готовы и ждут";метод Parse задокументирован здесь .

Второй момент: забудьте о блокировках для этого использования!- вместо этого используйте Queue.Queue , это по сути потокобезопасный и почти всегда лучший и самый простой способ координировать несколько потоков в Python.Просто создайте на нем экземпляр Queue q, q.put(name), и на q.get() сработал блок потоков, ожидающий выполнения дополнительной работы - это НАСТОЛЬКО просто!

(Есть нескольковспомогательные стратегии, которые вы можете использовать, чтобы координировать завершение рабочих потоков, когда у них больше нет работы, но простейшие, отсутствующие специальные требования - это просто сделать их потоками демонов, чтобы они все заканчивались, когда основной поток -см. документы ).

7 голосов
/ 19 января 2010

Я не слишком уверен в этой проблеме. Я предполагаю, что вызов ParseFile блокируется, и только поток синтаксического анализа выполняется из-за GIL. Обходным путем будет использование multiprocessing. Во всяком случае, он предназначен для работы с очередями.

Вы делаете Process и можете передать его Queue:

import sys, time
import xml.parsers.expat
import multiprocessing
import Queue

def do_expat(q):
    p = xml.parsers.expat.ParserCreate()

    def start_handler(name, attrs):
        q.put(name)

    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")

if __name__ == '__main__':
    q = multiprocessing.Queue()
    process = multiprocessing.Process(target=do_expat, args=(q,))
    process.start()

    elements = []
    while True:
        while True:
            try:
                elements.append(q.get_nowait())
            except Queue.Empty:
                break

        print elements
        time.sleep(1)

Я включил список элементов, просто чтобы повторить ваш оригинальный скрипт. Ваше окончательное решение, вероятно, будет использовать get_nowait и Pool или что-то подобное.

1 голос
/ 19 января 2010

Единственное, что я вижу, это то, что вы обращаетесь к q одновременно из разных потоков - без блокировки, как вы пишете. Это создает проблемы - и вы, вероятно, получаете проблемы в виде интерпретатора Python, привязанного к вам. :)

Попробуйте заблокировать, это действительно не очень сложно:

import sys, time
import xml.parsers.expat
import threading

q = []
q_lock = threading.Lock() <---

def start_handler(name, attrs):
    q_lock.acquire() <---
    q.append(name)
    q_lock.release() <---

def do_expat():
    p = xml.parsers.expat.ParserCreate()
    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")


t = threading.Thread(group=None, target=do_expat)
t.start()

while True:
    q_lock.acquire() <---
    print(q)
    q_lock.release() <---
    time.sleep(1)

Видите ли, это было действительно просто, мы просто создали переменную блокировки, чтобы защитить наш объект, и получали эту блокировку каждый раз перед тем, как использовать объект, и освобождали каждый раз после того, как мы завершили нашу задачу на объекте. Таким образом, мы гарантировали, что q.append(name) никогда не будет перекрываться с print(q).


(В более новых версиях Python есть также синтаксис «с ....», который помогает вам не снимать блокировки, не закрывать файлы или другие процедуры очистки, которые часто забывают.)

0 голосов
/ 19 января 2010

Я не очень много знаю о реализации, но если синтаксическим анализом является код C, который выполняется до завершения, другие потоки Python не будут работать. Если синтаксический анализатор вызывает обратно код Python, GIL может быть освобожден для запуска других потоков, но я не уверен в этом. Возможно, вы захотите проверить эти детали.

...