Поток данных - XML Источник - Python - Как? - PullRequest
0 голосов
/ 05 февраля 2020

Я пытаюсь добавить файл XML в код своего потока данных. Я вижу, java имеет встроенный XMLIo, а Python нет? Я также изо всех сил пытаюсь понять, каковы начальные шаги для ParDo / DoFn это сам. Это пример файла XML ниже. Мой конвейер ниже при разборе .csv я понимаю, но я не понимаю, как начать с XML источника. Нужно ли вручную создавать PCollection и go оттуда?

Моя цель - вернуть каждый элемент в виде кортежа. Ключом будет название страны, а каждый элемент после (во вложенном массиве) будет иметь значения.

<?xml version="1.0"?>
<data>
    <country name="Liechtenstein">
        <rank>1</rank>
        <year>2008</year>
        <gdppc>141100</gdppc>
        <neighbor name="Austria" direction="E"/>
        <neighbor name="Switzerland" direction="W"/>
    </country>
    <country name="Singapore">
        <rank>4</rank>
        <year>2011</year>
        <gdppc>59900</gdppc>
        <neighbor name="Malaysia" direction="N"/>
    </country>
    <country name="Panama">
        <rank>68</rank>
        <year>2011</year>
        <gdppc>13600</gdppc>
        <neighbor name="Costa Rica" direction="W"/>
        <neighbor name="Colombia" direction="E"/>
    </country>
</data>

def run():
   argv = [
      '--project={0}'.format(PROJECT),
      '--staging_location=gs://{0}/'.format(BUCKET),
      '--temp_location=gs://{0}/'.format(BUCKET),
      '--runner=DataflowRunner'
      #'--runner=DirectRunner'
   ]

   p = beam.Pipeline(argv=argv)

   (p
      | 'ReadFromGCS' >> beam.io.textio.ReadFromText('gs://{0}/example.csv'.format(BUCKET))
-[SNIP]-

1 Ответ

0 голосов
/ 05 февраля 2020

Код ниже будет собирать информацию о каждой стране.

Вывод представляет собой список кортежей.

Первый элемент в кортеже - это название страны, а второй элемент - список других свойств страны.

import xml.etree.ElementTree as ET


xml = '''<?xml version="1.0"?>
<data>
    <country name="Liechtenstein">
        <rank>1</rank>
        <year>2008</year>
        <gdppc>141100</gdppc>
        <neighbor name="Austria" direction="E"/>
        <neighbor name="Switzerland" direction="W"/>
    </country>
    <country name="Singapore">
        <rank>4</rank>
        <year>2011</year>
        <gdppc>59900</gdppc>
        <neighbor name="Malaysia" direction="N"/>
    </country>
    <country name="Panama">
        <rank>68</rank>
        <year>2011</year>
        <gdppc>13600</gdppc>
        <neighbor name="Costa Rica" direction="W"/>
        <neighbor name="Colombia" direction="E"/>
    </country>
</data>'''

result = []
root = ET.fromstring(xml)
for country in root.findall('.//country'):
    result.append((country.attrib['name'],[x.text if x.text else x.attrib for x in list(country)]))
print(result)

output

[('Liechtenstein', ['1', '2008', '141100', {'name': 'Austria', 'direction': 'E'}, {'name': 'Switzerland','direction': 'W'}]), ('Singapore', ['4', '2011', '59900', {'name': 'Malaysia', 'direction': 'N'}]), ('Panama', ['68', '2011', '13600', {'name': 'Costa Rica', 'direction': 'W'}, {'name': 'Colombia', 'direction': 'E'}])]
...