Считать файл XML из HDFS для анализа в Pyspark с помощью l xml .etree - PullRequest
0 голосов
/ 08 апреля 2020

Я написал парсер в Python, используя l xml .etree, и сейчас пытаюсь запустить указанный парсер в кластере Had oop. Когда я запускаю функцию локально, она работает как положено, но я получаю следующую ошибку, когда пытаюсь применить ее к файлу в кластере (я выполняю следующее в оболочке Pyspark, python3)

xml_pathname = "hdfs://file_path/date_directory/example_one.xml"
xml_tree = etree.parse(xml_pathname)

OSError: Error reading file '/file_path/date_directory/example_one.xml': failed to load external entity 
"/file_path/date_directory/example_one.xml"

Я вижу файл, когда запускаю hdfs dfs -ls /file_path/date_directory/example_one.xml в терминале.

Две области, в которых я буду признателен за помощь -

  1. Как загрузить файлы XML в метод l xml .etree.parse () из кластера, использующего Pyspark?
  2. Как мне лучше масштабировать его для эффективной работы в Spark? Я хотел бы проанализировать миллионы XML файлов в кластере, используя мой Python синтаксический анализатор - сработает ли приведенная ниже модификация, или есть лучший способ для парралелизации и запуска синтаксического анализатора в масштабе? Вообще, как мне настроить параметры в моей конфигурации искры для оптимальных результатов (большое количество исполнителей, более одного драйвера и т. Д. c.)?
#Same as above but with wildcards to parse millions of XML files

xml_pathname = "hdfs://file_path/*/*.xml"
xml_tree = etree.parse(xml_pathname)

Работали над этим на какое-то время и был бы очень благодарен за любую помощь. Ценю вас всех

1 Ответ

0 голосов
/ 14 апреля 2020
  1. Функция mapValues ​​() оказалась полезной. Сконфигурированные Sark парсеры XML, такие как парсер Pubmed, также представили полезный шаблонный код, такой как:
path_rdd = sc.parallelize(path_sample, numSlices=10000) # use only example path
    parse_results_rdd = path_rdd.map(lambda x: Row(file_name=os.path.basename(x), **pp.parse_pubmed_xml(x)))
    pubmed_oa_df = parse_results_rdd.toDF()
    pubmed_oa_df_sel = pubmed_oa_df[['full_title', 'abstract', 'doi',
                                     'file_name', 'pmc', 'pmid',
                                     'publication_year', 'publisher_id',
                                     'journal', 'subjects']]
    pubmed_oa_df_sel.write.parquet(os.path.join(save_dir, 'pubmed_oa_%s.parquet' % date_update_str),
                                   mode='overwrite')

https://github.com/titipata/pubmed_parser/blob/master/scripts/pubmed_oa_spark.py

Использование fs.globStatus позволило получить несколько файлов XML в одном подкаталоге.
...