Многопроцессорная обработка большого XML-файла со сложными объектами общей памяти - PullRequest
0 голосов
/ 22 октября 2018

Я нахожусь в процессе улучшения программы, которая анализирует XML, классифицирует и индексирует свои поддеревья.Реальная программа слишком велика, чтобы показывать ее здесь, поэтому я привел ее к минимальному тестовому примеру, показывающему проблему, с которой я сталкиваюсь.

Идея такова:

  1. Обрабатывать файлы XML вкаталог, один за другим
  2. Обрабатывать все alpino_ds узлов в файле параллельно
  3. Во время этого процесса процессу требуется доступ на чтение / запись к общим переменным, чтобы, например, мы моглипроверьте, сколько раз атрибут встречался в общей сложности, или отследите дескрипторы файлов

Обратите внимание, что в реальном коде есть еще несколько предостережений:

  • просто возвращая новыйзначения для процесса и последующее объединение их в главном потоке кажется нецелесообразным и, по-видимому, довольно медленным, поскольку фактическая структура данных имеет глубину dict с четырьмя уровнями, состоящую из dicts, set с, int с и string s, а также dict-to-filehandle и Counter() объекты;
  • Я пытался использовать потоки (с ThreadPoolExecutor), и хотя было некоторое усиление (Iрассчитано около 5% улучшенияЭто не достаточно хорошо для меня;
  • фактические данные, с которыми я работаю, могут состоять из файлов XML размером более 60 ГБ или до 15 миллионов alpino_ds тегов на файл.Это главная причина, по которой я хочу работать параллельно - просто 1031 *, поэтому данных.Это означает, что вложенные объекты также становятся достаточно большими, поэтому объединение / совместное использование этих объектов между процессами само по себе может быть узким местом.

Пример кода:

from pathlib import Path
from collections import Counter
from copy import copy
from lxml import etree

import concurrent.futures


class XmlGrinder:
    def __init__(self, m=1):
        if m is False:
            self.m = 1
        elif m == 0:
            self.m = None
        else:
            self.m = m

        self.max_a = 7
        self.max_b = 1000

        self.pdin = self.pdout = None
        self.pattern_counter = self.fhs = self.corpus = None

    def grind(self, din, dout):
        self.pdin = Path(din)
        self.pdout = Path(dout)

        for file in self.pdin.glob('*.xml'):
            self._grind_xml(file)

    def _grind_xml(self, pfin):
        self.pattern_counter = Counter()
        self.filenames = set()
        self.fhs = {}
        self.corpus = pfin.stem

        with concurrent.futures.ProcessPoolExecutor(max_workers=self.m) as executor:
            jobs = []
            context = etree.iterparse(str(pfin), tag='alpino_ds')

            for _, node in context:
                attrs = node.attrib
                # node has to have id
                if 'id' not in attrs:
                    continue

                jobs.append(executor.submit(self._process_node, etree.tostring(node)))

                # Makes sure our memory usage is kept in check by getting rid of unused elements
                # Borrowed from https://stackoverflow.com/a/7171543/1150683
                node.clear()
                # Also eliminate now-empty references from the root node to elem
                for ancestor in node.xpath('ancestor-or-self::*'):
                    while ancestor.getprevious() is not None:
                        del ancestor.getparent()[0]

            # Get rid of xml iterator
            del context

            sentence_nr = 0
            for job in concurrent.futures.as_completed(jobs):
                sentence_nr += 1
                print(f"Processed {self.corpus} sentence {sentence_nr:,d}", job.result(), flush=True)

            # self.* variables are empty! :-(
            print('pattern counter:', self.pattern_counter)
            print('filenames:', self.filenames)
            print('filehandles:', self.fhs)

            # won't do anything because fh is empty:
            for fh in self.fhs.values():
                fh.close()

    def _process_node(self, xml_str):
        node = etree.fromstring(xml_str)

        all_cats = ''
        for subnode in node.iter('node'):
            children_size = sum(1 for _ in subnode.iterchildren('node'))
            descendants_size = sum(1 for _ in subnode.iter('node'))
            # Size requirements of children and descendants
            if children_size < 1 \
                    or self.max_a < children_size \
                    or descendants_size > self.max_b:
                continue

            # get attribute of node
            cat = subnode.attrib['cat']
            all_cats += cat
            self.pattern_counter[cat] += 1

            # Create new XML tree
            tree_xml = etree.Element('tree', {
                'index': f"{cat}-{self.pattern_counter[cat]}"
            })
            tree_xml.append(copy(subnode))

            # open filehandle and write new tree to file
            if cat not in self.fhs:
                (self.pdout / self.corpus).mkdir(exist_ok=True, parents=True)
                tree_filename = self.pdout / self.corpus / f"{self.corpus}-{cat}-trees.xml"
                # open file handle and keep it open, only close after loop
                self.fhs[cat] = tree_filename.open(mode='a', encoding='utf-8')

            self.fhs[cat].write('\n\t\t' + etree.tostring(tree_xml, encoding='unicode'))

        return all_cats


if __name__ == '__main__':
    # use m=[int] to enable multiple cores or m=0 to utilise all cores
    xml_grindr = XmlGrinder()
    xml_grindr.grind(r'../data', r'../output')

Пример XML(сохраните его в файл XML и поместите в каталог; используйте этот каталог в качестве первого аргумента xml_grindr.grind()):

<?xml version="1.0" encoding="UTF-8"?><treebank><alpino_ds version="1.3" id="18.head.1.s.1"><node begin="0" cat="top" end="4" id="0" rel="top"><node begin="0" cat="conj" end="4" id="1" rel="--"><node begin="0" end="1" frame="within_word_conjunct" id="2" lcat="np" lemma="_" pos="prefix" postag="SPEC(afgebr)" pt="spec" rel="cnj" root="taal" sense="taal" spectype="afgebr" word="taal-"/><node begin="1" conjtype="neven" end="2" frame="conj(en)" id="3" lcat="vg" lemma="en" pos="vg" postag="VG(neven)" pt="vg" rel="crd" root="en" sense="en" word="en"/><node begin="2" cat="mwu" end="4" id="4" mwu_root="spraaktechnologienieuws jul&apos;03" mwu_sense="spraaktechnologienieuws jul&apos;03" rel="cnj"><node begin="2" end="3" frame="proper_name(both)" genus="onz" getal="ev" graad="basis" id="5" lcat="np" lemma="spraaktechnologienieuws" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,onz,stan)" pt="n" rel="mwp" root="spraaktechnologienieuws" sense="spraaktechnologienieuws" word="spraaktechnologienieuws"/><node begin="3" end="4" frame="proper_name(both)" id="6" lcat="np" lemma="_" num="both" pos="name" postag="SPEC(symb)" pt="spec" rel="mwp" root="jul&apos;03" sense="jul&apos;03" spectype="symb" word="jul&apos;03"/></node></node></node></alpino_ds><alpino_ds version="1.3" id="18.head.2.s.1"><node begin="0" cat="top" end="2" id="0" rel="top"><node begin="0" end="1" frame="noun(de,count,sg)" gen="de" id="1" lcat="np" lemma="1" num="sg" numtype="hoofd" pos="noun" positie="vrij" postag="TW(hoofd,vrij)" pt="tw" rel="--" root="1" sense="1" word="1"/><node begin="1" end="2" frame="punct(punt)" id="2" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds><alpino_ds version="1.3" id="18.head.2.s.2"><node begin="0" cat="top" end="5" id="0" rel="top"><node begin="0" cat="mwu" end="5" id="1" mwu_root="DISCUSSIE OVER TAALTECHNOLOGIE IN TAALSCHRIFT" mwu_sense="DISCUSSIE OVER TAALTECHNOLOGIE IN TAALSCHRIFT" rel="--"><node begin="0" buiging="met-e" end="1" frame="proper_name(both)" graad="basis" id="2" lcat="np" lemma="Discussie" naamval="stan" num="both" pos="name" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="mwp" root="DISCUSSIE" sense="DISCUSSIE" word="DISCUSSIE"/><node begin="1" end="2" frame="proper_name(both)" id="3" lcat="np" lemma="over" num="both" pos="name" postag="VZ(init)" pt="vz" rel="mwp" root="OVER" sense="OVER" vztype="init" word="OVER"/><node begin="2" end="3" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="4" lcat="np" lemma="taaltechnologie" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="mwp" root="TAALTECHNOLOGIE" sense="TAALTECHNOLOGIE" word="TAALTECHNOLOGIE"/><node begin="3" end="4" frame="proper_name(both)" id="5" lcat="np" lemma="iN" num="both" pos="name" postag="VZ(init)" pt="vz" rel="mwp" root="IN" sense="IN" vztype="init" word="IN"/><node begin="4" end="5" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="6" lcat="np" lemma="taalschrift" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="mwp" root="TAALSCHRIFT" sense="TAALSCHRIFT" word="TAALSCHRIFT"/></node></node></alpino_ds><alpino_ds version="1.3" id="18.head.3.s.1"><node begin="0" cat="top" end="2" id="0" rel="top"><node begin="0" end="1" frame="number(hoofd(pl_num))" id="1" infl="pl_num" lcat="np" lemma="2" numtype="hoofd" pos="num" positie="vrij" postag="TW(hoofd,vrij)" pt="tw" rel="--" root="2" sense="2" special="hoofd" word="2"/><node begin="1" end="2" frame="punct(punt)" id="2" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds><alpino_ds version="1.3" id="18.head.3.s.2"><node begin="0" cat="top" end="7" id="0" rel="top"><node begin="0" cat="mwu" end="7" id="1" mwu_root="AMERIKAANSE OVERHEID KIEST VOOR LINKFACTORY VAN L&amp;C" mwu_sense="AMERIKAANSE OVERHEID KIEST VOOR LINKFACTORY VAN L&amp;C" rel="--"><node begin="0" buiging="met-e" end="1" frame="proper_name(both)" graad="basis" id="2" lcat="np" lemma="Amerikaans" naamval="stan" num="both" pos="name" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="mwp" root="AMERIKAANSE" sense="AMERIKAANSE" word="AMERIKAANSE"/><node begin="1" end="2" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="3" lcat="np" lemma="overheid" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="mwp" root="OVERHEID" sense="OVERHEID" word="OVERHEID"/><node begin="2" end="3" frame="proper_name(both)" id="4" lcat="np" lemma="kiezen" num="both" pos="name" postag="WW(pv,tgw,met-t)" pt="ww" pvagr="met-t" pvtijd="tgw" rel="mwp" root="KIEST" sense="KIEST" word="KIEST" wvorm="pv"/><node begin="3" end="4" frame="proper_name(both)" id="5" lcat="np" lemma="voor" num="both" pos="name" postag="VZ(init)" pt="vz" rel="mwp" root="VOOR" sense="VOOR" vztype="init" word="VOOR"/><node begin="4" conjtype="neven" end="5" frame="proper_name(both)" id="6" lcat="np" lemma="linkfactory" num="both" pos="name" postag="VG(neven)" pt="vg" rel="mwp" root="LINKFACTORY" sense="LINKFACTORY" word="LINKFACTORY"/><node begin="5" end="6" frame="proper_name(both)" id="7" lcat="np" lemma="van" num="both" pos="name" postag="VZ(init)" pt="vz" rel="mwp" root="VAN" sense="VAN" vztype="init" word="VAN"/><node begin="6" end="7" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="8" lcat="np" lemma="l&amp;amp;C" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="mwp" root="L&amp;C" sense="L&amp;C" word="L&amp;C"/></node></node></alpino_ds><alpino_ds version="1.3" id="18.head.4.s.1"><node begin="0" cat="top" end="2" id="0" rel="top"><node begin="0" end="1" frame="number(hoofd(pl_num))" id="1" infl="pl_num" lcat="np" lemma="3" numtype="hoofd" pos="num" positie="vrij" postag="TW(hoofd,vrij)" pt="tw" rel="--" root="3" sense="3" special="hoofd" word="3"/><node begin="1" end="2" frame="punct(punt)" id="2" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds><alpino_ds version="1.3" id="18.head.4.s.2"><node begin="0" cat="top" end="6" id="0" rel="top"><node begin="0" cat="np" end="6" id="1" rel="--"><node begin="0" buiging="met-e" end="1" frame="noun(both,both,both)" gen="both" graad="basis" id="2" lcat="np" lemma="Spraakgestuurde" naamval="stan" num="both" pos="noun" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="hd" root="spraakgestuurde" sense="spraakgestuurde" word="SPRAAKGESTUURDE"/><node begin="1" cat="mwu" end="6" id="3" mwu_root="LAST-MINUTE TAALCURSUS VIA DE TELEFOON" mwu_sense="LAST-MINUTE TAALCURSUS VIA DE TELEFOON" rel="app"><node begin="1" buiging="met-e" end="2" frame="proper_name(both)" graad="basis" id="4" lcat="np" lemma="Last-minuat" naamval="stan" num="both" pos="name" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="mwp" root="LAST-MINUTE" sense="LAST-MINUTE" word="LAST-MINUTE"/><node begin="2" end="3" frame="proper_name(both)" getal="mv" graad="basis" id="5" lcat="np" lemma="taalcursus" ntype="soort" num="both" pos="name" postag="N(soort,mv,basis)" pt="n" rel="mwp" root="TAALCURSUS" sense="TAALCURSUS" word="TAALCURSUS"/><node begin="3" end="4" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="6" lcat="np" lemma="Via" naamval="stan" ntype="eigen" num="both" pos="name" postag="N(eigen,ev,basis,zijd,stan)" pt="n" rel="mwp" root="VIA" sense="VIA" word="VIA"/><node begin="4" end="5" frame="proper_name(both)" id="7" lcat="np" lemma="dE" lwtype="bep" naamval="stan" npagr="rest" num="both" pos="name" postag="LID(bep,stan,rest)" pt="lid" rel="mwp" root="DE" sense="DE" word="DE"/><node begin="5" end="6" frame="proper_name(both)" getal="mv" graad="basis" id="8" lcat="np" lemma="telefoon" ntype="soort" num="both" pos="name" postag="N(soort,mv,basis)" pt="n" rel="mwp" root="TELEFOON" sense="TELEFOON" word="TELEFOON"/></node></node></node></alpino_ds><alpino_ds version="1.3" id="18.head.5.s.1"><node begin="0" cat="top" end="2" id="0" rel="top"><node begin="0" end="1" frame="number(hoofd(pl_num))" id="1" infl="pl_num" lcat="np" lemma="4" numtype="hoofd" pos="num" positie="vrij" postag="TW(hoofd,vrij)" pt="tw" rel="--" root="4" sense="4" special="hoofd" word="4"/><node begin="1" end="2" frame="punct(punt)" id="2" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds><alpino_ds version="1.3" id="18.head.5.s.2"><node begin="0" cat="top" end="5" id="0" rel="top"><node begin="0" cat="np" end="5" id="1" rel="--"><node begin="0" end="1" frame="noun(de,count,pl)" gen="de" id="2" lcat="np" lemma="_" num="pl" pos="noun" postag="SPEC(deeleigen)" pt="spec" rel="hd" root="aio_vacature" sense="aio_vacature" spectype="deeleigen" word="AIO-VACATURES"/><node begin="1" cat="pp" end="5" id="3" rel="mod"><node begin="1" end="2" frame="preposition(in,[])" id="4" lcat="pp" lemma="iN" pos="prep" postag="VZ(init)" pt="vz" rel="hd" root="in" sense="in" vztype="init" word="IN"/><node begin="2" cat="conj" end="5" id="5" rel="obj1"><node begin="2" end="3" frame="proper_name(both,'LOC')" genus="onz" getal="ev" graad="basis" id="6" lcat="np" lemma="Tilburg" naamval="stan" neclass="LOC" ntype="eigen" num="both" pos="name" postag="N(eigen,ev,basis,onz,stan)" pt="n" rel="cnj" root="TILBURG" sense="TILBURG" word="TILBURG"/><node begin="3" conjtype="neven" end="4" frame="conj(en)" id="7" lcat="vg" lemma="eN" pos="vg" postag="VG(neven)" pt="vg" rel="crd" root="en" sense="en" word="EN"/><node begin="4" end="5" frame="proper_name(both,'LOC')" genus="onz" getal="ev" graad="basis" id="8" lcat="np" lemma="Amsterdam" naamval="stan" neclass="LOC" ntype="eigen" num="both" pos="name" postag="N(eigen,ev,basis,onz,stan)" pt="n" rel="cnj" root="AMSTERDAM" sense="AMSTERDAM" word="AMSTERDAM"/></node></node></node></node></alpino_ds><alpino_ds version="1.3" id="18.p.1.s.1"><node begin="0" cat="top" end="7" id="0" rel="top"><node begin="0" cat="smain" end="7" id="1" rel="--"><node begin="0" end="1" frame="noun(de,count,pl)" gen="de" getal="mv" graad="basis" id="2" index="1" lcat="np" lemma="computer" ntype="soort" num="pl" pos="noun" postag="N(soort,mv,basis)" pt="n" rel="su" root="computer" sense="computer" word="Computers"/><node begin="1" end="2" frame="verb(hebben,pl,aux(te_inf))" id="3" infl="pl" lcat="smain" lemma="hoeven" pos="verb" postag="WW(pv,tgw,mv)" pt="ww" pvagr="mv" pvtijd="tgw" rel="hd" root="hoef" sc="aux(te_inf)" sense="hoef" tense="present" word="hoeven" wvorm="pv"/><node begin="0" cat="ti" end="7" id="4" rel="vc"><node begin="4" end="5" frame="complementizer(te)" id="5" lcat="cp" lemma="te" pos="comp" postag="VZ(init)" pt="vz" rel="cmp" root="te" sc="te" sense="te" vztype="init" word="te"/><node begin="0" cat="inf" end="7" id="6" rel="body"><node begin="0" end="1" id="7" index="1" rel="su"/><node begin="5" buiging="zonder" end="6" frame="verb('hebben/zijn',inf,aux(inf))" id="8" infl="inf" lcat="inf" lemma="kunnen" pos="verb" positie="vrij" postag="WW(inf,vrij,zonder)" pt="ww" rel="hd" root="kan" sc="aux(inf)" sense="kan" word="kunnen" wvorm="inf"/><node begin="0" cat="inf" end="7" id="9" rel="vc"><node begin="0" end="1" id="10" index="1" rel="su"/><node begin="2" cat="np" end="4" id="11" rel="obj1"><node begin="2" buiging="zonder" end="3" frame="determiner(geen,nwh,mod,pro,yparg,nwkpro,geen)" id="12" infl="geen" lcat="detp" lemma="geen" naamval="stan" npagr="agr" pdtype="det" pos="det" positie="prenom" postag="VNW(onbep,det,stan,prenom,zonder,agr)" pt="vnw" rel="det" root="geen" sense="geen" vwtype="onbep" wh="nwh" word="geen"/><node begin="3" end="4" frame="noun(het,mass,sg)" gen="het" genus="onz" getal="ev" graad="basis" id="13" lcat="np" lemma="Nederlands" naamval="stan" ntype="eigen" num="sg" pos="noun" postag="N(eigen,ev,basis,onz,stan)" pt="n" rel="hd" root="Nederlands" sense="Nederlands" word="Nederlands"/></node><node begin="6" buiging="zonder" end="7" frame="verb(hebben,inf(no_e),transitive)" id="14" infl="inf(no_e)" lcat="inf" lemma="verstaan" pos="verb" positie="vrij" postag="WW(inf,vrij,zonder)" pt="ww" rel="hd" root="versta" sc="transitive" sense="versta" word="verstaan" wvorm="inf"/></node></node></node></node></node></alpino_ds><alpino_ds version="1.3" id="18.p.2.s.2"><node begin="0" cat="top" end="14" id="0" rel="top"><node begin="7" end="8" frame="punct(komma)" id="1" lcat="punct" lemma="," pos="punct" postag="LET()" pt="let" rel="--" root="," sense="," special="komma" word=","/><node begin="10" end="11" frame="punct(komma)" id="2" lcat="punct" lemma="," pos="punct" postag="LET()" pt="let" rel="--" root="," sense="," special="komma" word=","/><node begin="0" cat="smain" end="13" id="3" rel="--"><node begin="0" end="1" frame="er_adverb(voor)" id="4" lcat="pp" lemma="daarvoor" pos="pp" postag="BW()" pt="bw" rel="mod" root="daarvoor" sense="daarvoor" special="er" word="Daarvoor"/><node begin="1" end="2" frame="verb(unacc,sg3,intransitive)" id="5" infl="sg3" lcat="smain" lemma="verlopen" pos="verb" postag="WW(pv,tgw,met-t)" pt="ww" pvagr="met-t" pvtijd="tgw" rel="hd" root="verloop" sc="intransitive" sense="verloop" tense="present" word="verloopt" wvorm="pv"/><node begin="2" cat="np" end="5" id="6" rel="su"><node begin="2" end="3" frame="determiner(de)" id="7" infl="de" lcat="detp" lemma="de" lwtype="bep" naamval="stan" npagr="rest" pos="det" postag="LID(bep,stan,rest)" pt="lid" rel="det" root="de" sense="de" word="de"/><node aform="base" begin="3" buiging="met-e" end="4" frame="adjective(e)" graad="basis" id="8" infl="e" lcat="ap" lemma="menselijk" naamval="stan" pos="adj" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="mod" root="menselijk" sense="menselijk" vform="adj" word="menselijke"/><node begin="4" end="5" frame="noun(de,count,sg)" gen="de" genus="zijd" getal="ev" graad="basis" id="9" lcat="np" lemma="communicatie" naamval="stan" ntype="soort" num="sg" pos="noun" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="hd" root="communicatie" sense="communicatie" word="communicatie"/></node><node begin="5" cat="ap" end="7" id="10" rel="mod"><node begin="5" end="6" frame="intensifier" id="11" lcat="advp" lemma="te" pos="adv" postag="BW()" pt="bw" rel="mod" root="te" sense="te" special="intensifier" word="te"/><node aform="base" begin="6" buiging="zonder" end="7" frame="adjective(no_e(adv))" graad="basis" id="12" infl="no_e" lcat="ap" lemma="subtiel" pos="adj" positie="vrij" postag="ADJ(vrij,basis,zonder)" pt="adj" rel="hd" root="subtiel" sense="subtiel" vform="adj" word="subtiel"/></node><node begin="8" cat="ppart" end="10" id="13" rel="mod"><node begin="8" end="9" frame="intensifier" id="14" lcat="advp" lemma="te" pos="adv" postag="VZ(init)" pt="vz" rel="mod" root="te" sense="te" special="intensifier" vztype="init" word="te"/><node aform="base" begin="9" buiging="zonder" end="10" frame="adjective(ge_no_e(adv))" id="15" infl="no_e" lcat="ppart" lemma="nuanceren" pos="adj" positie="vrij" postag="WW(vd,vrij,zonder)" pt="ww" rel="hd" root="genuanceerd" sense="genuanceerd" vform="psp" word="genuanceerd" wvorm="vd"/></node><node begin="11" cat="ap" end="13" id="16" rel="mod"><node begin="11" end="12" frame="intensifier" id="17" lcat="advp" lemma="te" pos="adv" postag="BW()" pt="bw" rel="mod" root="te" sense="te" special="intensifier" word="te"/><node aform="base" begin="12" buiging="zonder" end="13" frame="adjective(no_e(adv))" graad="basis" id="18" infl="no_e" lcat="ap" lemma="rijk" pos="adj" positie="vrij" postag="ADJ(vrij,basis,zonder)" pt="adj" rel="hd" root="rijk" sense="rijk" vform="adj" word="rijk"/></node></node><node begin="13" end="14" frame="punct(punt)" id="19" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds></treebank>

Pipfile, если вы хотите настроить локальную средудля тестирования (вышеупомянутый скрипт, вероятно, работает с 3.4 и выше, хотя):

[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
lxml = ">=4.2.1"

[dev-packages]

[requires]
python_version = "3.6"

В зависимости от того, как вы назвали XML, вывод скрипта будет выглядеть примерно так:

Processing WRPEE-dummy sentence 1 topconjmwu
Processing WRPEE-dummy sentence 2 top
Processing WRPEE-dummy sentence 3 topmwu
Processing WRPEE-dummy sentence 4 top
Processing WRPEE-dummy sentence 5 topmwu
Processing WRPEE-dummy sentence 6 top
Processing WRPEE-dummy sentence 7 topnpmwu
Processing WRPEE-dummy sentence 8 top
Processing WRPEE-dummy sentence 9 topnpppconj
Processing WRPEE-dummy sentence 10 topsmaintiinfinfnp
Processing WRPEE-dummy sentence 11 topsmainnpapppartap
pattern counter: Counter()
filenames: set()
filehandles: {}

Это показывает, что переменные, которые я хочу разделить между процессами, действительно не являются общими или измененными.(Я знаю, что это происходит потому, что процесс и его переменные разветвлены.) В этом фиктивном примере это может показаться не очень важным (хотя ясно, что теперь я не могу закрыть дескрипторы открытого файла), но в целомВ программе эти и другие переменные модифицируются и используются внутри разветвленных процессов.Вопрос в том, как заставить это работать.

Я читал о Pipe () и Queue (), и мне кажется, что мне понадобится Queue().Кроме того, поскольку я буду очень часто читать и писать из разных процессов в один и тот же объект, я думаю, что мне также понадобится Manager().Это та часть, в которой я не уверен, однако.Я попытался прочитать эту тему , но это только смутило меня больше.Кроме того, комментарии предполагают , что даже на 3.6.4 могут быть проблемы при использовании сложных объектов.Помните, что фактические данные, которыми я делюсь, являются вложенными и состоят из разных типов.

Таким образом, в итоге возникает вопрос: как мне переписать приведенный выше пример кода, чтобы гарантировать, что все Process esиметь (неблокирующий) доступ на чтение / запись к переменным экземпляра внутри _process_node и к методам, которые вызываются из этого процесса?Я готов обновить мою текущую версию Python (3.6.4) до 3.7 и использовать дополнительные библиотеки.

1 Ответ

0 голосов
/ 29 октября 2018

Библиотека multiprocessing позволяет использовать параллелизм в параллельном коде Python.Без multiprocessing Python GIL имеет тенденцию мешать истинному параллельному выполнению, но вы должны увидеть multiprocessing код, который ничем не отличается от других методов параллелизма.По сути, самое большое различие между multiprocessing и потоками заключается в том, что состояние разделяется с помощью медленных вызовов IPC.

Это означает, что вам необходимо тщательно обрабатывать общих ресурсов .Ваша текущая реализация не справляется с этой задачей;у вас есть несколько одновременных задач доступа к общим ресурсам независимо от того, что могут делать другие.В вашем коде существует множество возможностей для состязаний, когда несколько задач могут записывать в один и тот же файл или когда обновляется вложенная структура данных без учета других обновлений.

Когда вам нужно обновить общие структуры данных илифайлы, вы обычно можете выбрать один из двух вариантов:

  • Использовать синхронизацию;любой, кому необходимо изменить ресурс, должен сначала получить общую блокировку или использовать какой-либо другой вид примитива синхронизации для координации доступа.
  • Сделать единственную задачу ответственной за изменение ресурса.Обычно это включает одну или несколько очередей .

Обратите внимание, что вам придется явно передавать любой из этих объектов (примитивы или очереди синхронизации) дочерним процессам, см. рекомендации по программированию ;не используйте ссылки на экземпляр для совместного использования состояния.

Для вашего случая я бы пошел с очередями и выделенными задачами;Ваше узкое место - обработка данных, запись данных на диск и обновление нескольких структур данных с результатами задач анализа относительно быстро по сравнению.

Поэтому используйте одну задачу для записи в файлы;просто поместите сериализованную строку XML вместе со значением cat в выделенную очередь и выполните отдельную задачу, которая извлекает их из очереди и записывает их в файлы.Эта отдельная задача отвечает за доступ ко всем файлам, включая открытие и закрытие.Это сериализует доступ к файлу и удаляет возможность для состязаний и записи с перебоями.Если данные для этих файлов получаются настолько плотными и быстрыми, что делают эту задачу узким местом, создавайте задачи для целевого файла.

Сделайте то же самое для общих структур данных;отправьте мутации в очередь, оставьте ее выделенной задаче для объединения данных.Обновление прокси-объектов не очень подходит, потому что их изменения распространяются на другие процессы с помощью вызовов RPC, что увеличивает шансы на состояние гонки и блокировку не гарантирует согласованность данных во всех процессах задачи!

Для вашего простогоНапример, обновления объекта Counter() на самом деле не являются общими;каждый дочерний процесс наследует копию, когда он разветвляет и обновляет эту локальную копию, и родительский процесс никогда не увидит сделанные изменения.Таким образом, вы используете локальный, новый Counter() экземпляр и помещаете его в очередь.Затем выделенная задача может получать их из очереди и обновлять локальный экземпляр Counter() значениями, используя total_counter.update(queued_counter), снова гарантируя, что обновления сериализуются.

Для иллюстрации приведу надуманный пример подсчета LoremДанные ипсума;серия задач count_words выполняет подсчет, но передает объект Counter(), который они производят, в очередь для отдельной задачи сопоставления, чтобы объединить их в окончательный счетчик слов.Отдельная задача ведения журнала записывает данные из очереди ведения журнала на диск:

import datetime
import random
import re
import time

from collections import Counter
from functools import partial
from multiprocessing import Manager, Pool
from io import TextIOWrapper
from urllib.request import urlopen

COMPLETE = "COMPLETE"

def collating_task(countsqueue, logqueue):
    wordcounts = Counter()

    # Loop until COMPLETE is found in the queue
    for counts in iter(countsqueue.get, COMPLETE):
        wordcounts.update(counts)
        logqueue.put(
            f"collating: updating with {len(counts)} words "
            f"(total {len(wordcounts)})"
        )

    return wordcounts

def logging_task(logqueue):
    # Loop until COMPLETE is found in the queue
    with open('logfile.txt', 'w') as logf:
        for message in iter(logqueue.get, COMPLETE):
            print(datetime.datetime.now(), message, flush=True, file=logf)

def count_words(line, countsqueue, logqueue):
    findwords = re.compile(r"\w+").findall
    counts = Counter(findwords(line))
    logqueue.put(f"counting: counted {len(counts)} words")
    # a random short delay to make this task 'heavy'
    time.sleep(random.uniform(0.0, 0.05))
    countsqueue.put(counts)

def main():
    # Random latin text, 1000 paragraphs
    loripsum_response = urlopen("https://loripsum.net/api/1000/long/plaintext")
    text = list(TextIOWrapper(loripsum_response, encoding="utf8"))
    print(f"Will process {len(text)} lines of data")

    # create managed queues that can be passed in as arguments. The alternative
    # is to create globals or Process() objects with queues passed in.
    manager = Manager()
    countsqueue, logqueue = manager.Queue(), manager.Queue()
    with Pool() as pool:
        # start processing tasks, these loop forever until signalled
        collator = pool.apply_async(collating_task, (countsqueue, logqueue))
        logger = pool.apply_async(logging_task, (logqueue,))

        # process lines, blocks until complete
        pool.map(partial(count_words, countsqueue=countsqueue, logqueue=logqueue), text)

        countsqueue.put(COMPLETE)
        wordcounts = collator.get()
        logqueue.put(COMPLETE)
        logger.wait()

    print(f"Counted {len(wordcounts)} different words; top 5 is:")
    for word, count in wordcounts.most_common(5):
        print(f'{word:<10} {count:4d}')

if __name__ == "__main__":
    main()

, что приводит к чему-то вроде:

Will process 2000 lines of data
Counted 5651 different words; top 5 is:
et         2078
in         2074
est        2036
non        1911
ut         1477

и большому logfile.txt с информацией, помещаемой в очередь ведения журнала.

...