Задание потока данных Apache Beam в python не выполняется - PullRequest
0 голосов
/ 24 октября 2019

Я создаю конвейер данных, используя луч Apache, чтобы принять XML-файл в качестве входных данных из хранилища Google (GCS) и преобразовать его в файл JSON. Я пытаюсь использовать библиотеку «xmltodict» python, чтобы сначала преобразовать XML в python dict, после чего я использую функцию python json.dumps (), чтобы преобразовать ее в формат json. Я создал отдельные классы beam.DoFn для каждого шага конвейера лучей.

Я протестировал конвейер на небольшом файле (размером менее 1 МБ), и он работал. Код запускается как на directrunner (менее 1 минуты), так и на обработчике потока данных (5-6 минут, включая запуск и остановку задания потока данных). Но когда я использую обработчик потока данных с большим файлом, например (~ 150 МБ), конвейер продолжает работать почти 1 час без какого-либо прогресса. Я не должен выяснять, что не так.

Я думаю, что проблема заключается в том, чтобы взять весь файл в качестве входных данных в одной строке, и если я смогу каким-то образом лучше прочитать XML-файл из GCS, чтобы проанализировать каждую запись как одну запись, решит эту проблему. Я буду признателен за любую помощь в улучшении этого.

Ниже приведены примеры кодов:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions

class ReadGCSfile(beam.DoFn):
    def process(self,element):      
        from apache_beam.io.gcp import gcsio
        gcs = gcsio.GcsIO()
        yield(gcs.open(element).read())

# this class converts hyphens(-) in the XML text into (_) so that column name convention is inline with #the BigQuery conventions

class clean_xml(beam.DoFn):
    def process(self,element):
        if re.search('(<.[^\>]*)-', element) == None:
            return[element]
        else:
            for i in range(len(re.findall('(<.[^\>]*)-', element))):
                y = re.search('(<.[^\>]*)-', element)
                splitpoint1 = y.span()[0]
                splitpoint2 = y.span()[1]
                element = element[:splitpoint1] + element[splitpoint1:splitpoint2].replace('-','_') + element[splitpoint2:]
            return[(element)]

class createdict(beam.DoFn):
    def process(self,element):
        import xmltodict
        order_data = xmltodict.parse(element)
        unnested_data = order_data['root1']['root2']
        return[(unnested_data)]

class converttojson(beam.DoFn):
    def process(self,element):
        import json
        import re
        for order in element:
            order_j = json.dumps(order)
            yield(order_j)

def run(argv = None):

    options = PipelineOptions()
    google_cloud_options = options.view_as(GoogleCloudOptions)  
    google_cloud_options.project = 'project123'
    google_cloud_options.job_name = 'job123'
    google_cloud_options.staging_location = 'gs://bucket123/staging'
    google_cloud_options.temp_location = 'gs://bucket123/temp'
    google_cloud_options.machine_type = 'n1-standard-8'
    options.view_as(StandardOptions).runner = 'DataflowRunner'

    p = beam.Pipeline(options=options)
    input_file_path = 'gs://' + input_bucket +'/'+input_file

    (p
        | 'Create' >> beam.Create([input_file_path])
        | 'GetXML' >> beam.ParDo(ReadGCSfile())
        | 'Clean_XML' >> beam.ParDo(clean_xml())
        | 'CreateDict' >> beam.ParDo(createdict())
        | 'Convert2JSON' >> beam.ParDo(converttojson())
        | 'write' >> beam.io.WriteToText('gs://bk/output',file_name_suffix='.json',num_shards =1,shard_name_template='')
    )
    p.run()

Какие улучшения я могу сделать в этом, чтобы сделать его более эффективным для больших файлов. Сейчас кажется, что конвейер застрял на этапах GetXML и Clean_XML. Как выполнить итерацию по файлу XML по одной записи за раз?

Ниже приведен пример файла данных:

<?xml version="1.0" encoding="UTF-8"?>
<root1 xmlns="http://www.example.com">
    <root2 ID-no="000000">
        <date>2022-09-23T06:58:24.000Z</date>
        <created-by>storefront</created-by>
        <original-order-no>000000</original-order-no>
        <currency>USD</currency>
        <invoice-no>11111111</invoice-no>
        <customer>
            <customer-name>abcccccc</customer-name>
            <customer-email>abccccc@gmail.com</customer-email>
            <billing-address>
                <address1>20 xyz</address1>
                <city>mars</city>
                <postal-code>123456</postal-code>
                <state-code>hhjbjh</state-code>
                <country-code>nm mn</country-code>
            </billing-address>
        </customer>
        <status>
            <order-status>NEW</order-status>
            <shipping-status>NOT_SHIPPED</shipping-status>
            <confirmation-status>CONFIRMED</confirmation-status>
            <payment-status>NOT_PAID</payment-status>
        </status>
    </root2>
</root1>

1 Ответ

1 голос
/ 24 октября 2019

Скорее всего, это связано с отсутствием параллелизма для эффективного завершения. Если у нас есть сомнения, что это связано с тем, что задание не выполняется, я бы предложил добавить счетчики для отслеживания прогресса (например, количество обработанных слов и т. Д.).

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/counters.py

...