Apache Beam Python не может проанализировать опубликованный XML - PullRequest
0 голосов
/ 09 ноября 2018

Привет! Я написал лучевой конвейер для чтения каталога и анализа загруженных опубликованных XML-файлов с использованием библиотеки pubmed_parse. Библиотека работает нормально через стандартную программу на Python, но если я перевожу это на конвейер лучей apache, как показано ниже, то при синтаксическом анализе происходит ошибка: Был бы признателен за помощь в выяснении этого

File "/home/micdsouz/venv/medline/data-preprocessing.py", line 19, in process
    pubmed_dict = pp.parse_pubmed_xml(element)
  File "/home/micdsouz/venv/local/lib/python2.7/site-packages/pubmed_parser/pubmed_oa_parser.py", line 112, in parse_pubmed_xml
    dict_article_meta = parse_article_meta(tree)
  File "/home/micdsouz/venv/local/lib/python2.7/site-packages/pubmed_parser/pubmed_oa_parser.py", line 60, in parse_article_meta
    pmid_node = article_meta.find('article-id[@pub-id-type="pmid"]')
AttributeError: 'NoneType' object has no attribute 'find' [while running 'ReadData']

from __future__ import absolute_import
import argparse
import os
import logging
import re
import pubmed_parser as pp
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

class ExtractXMLTags(beam.DoFn):
    def process(self,element):
        print('Current URL = {} '.format(element))
        pubmed_dict = pp.parse_pubmed_xml(element)
        print('Dictionary output = \n {}'.format(pubmed_dict))
        yield pubmed_dict

def run(argv=None):
    """Main entry point; defines and runs the preprocessing  pipeline."""
    print('In Run - Begin processing')
    # setup the argument parser and arguments
    parser = argparse.ArgumentParser(description='program to preprocess the medline xml files and extract the important fields.')
    #Add the arguments
    print('Adding Arguments')
    parser.add_argument(
            '--input',
            default='gs://medline-221810/medline/xml_files/',
            help='Path to input files. Can be gs or local path')
    parser.add_argument(
            '--output',
            default='gs://medline-221810/medline/xml_output_files/xml_data.txt',
            help='Path to final output file.')
    parser.add_argument(
            '--batchsize',
            default=50,
            help='Batch size for the processing.')
    parser.add_argument(
            '--filenums',
            default=50,
            help='The number of filesin total to process.')

    #Get the known and additional arguments sent
    known_args, pipeline_args = parser.parse_known_args(argv)

    # Set up the pipeline
    # Specifiy the pipeline arguments
    # include the parser folder under extra_packages
    pipeline_args.extend([
            '--runner=DataflowRunner',
            '--project=medline-221810',
            '--staging_location=gs://medline-221810/medline/staging',
            '--temp_location=gs://medline-221810/medline/temp',
            '--job_name=medline-preprocess-x1-job',
            '--extra_package=pubmed_parser.tar.gz'
            ])
    # setup pipeline options
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True

    # set the variables from command line arguments
    num_xml_files=int(known_args.filenums)
    batch_size=known_args.batchsize
    uri=known_args.input

    # Create a list of files to be processed
    with beam.Pipeline(options=pipeline_options) as p:
        # Read the text file[pattern] into a PCollection
        print('Get the files and urls')
        print('uri = {} '.format(uri))
        gsurls = [os.path.join(uri,'pubmed18n%04d.xml.gz' % j)
                for j in range(1, num_xml_files + 1)
                if os.path.exists(os.path.join(uri,'pubmed18n%04d.xml.gz' % j))                
                ]

        print('gsurls = \n {}'.format(gsurls))
        # build the pipeline
        parsed_data = p | 'CreatePColData' >> beam.Create(gsurls) | 'ReadData' >> beam.ParDo(ExtractXMLTags())

        print('Sent to pipeline ....')
        print('Exiting run')


if __name__ == "__main__":
    # Setup logger 
    logging.getLogger().setLevel(logging.DEBUG)
    print('in main')
    # call run to begin processing
    run()

1 Ответ

0 голосов
/ 09 ноября 2018

Это похоже на проблему с библиотекой pubmed_parser. Вот две проблемы в их трекере, которые, кажется, соответствуют вашей проблеме:

...