Я создаю конвейер данных, используя луч Apache, чтобы принять XML-файл в качестве входных данных из хранилища Google (GCS) и преобразовать его в файл JSON. Я пытаюсь использовать библиотеку «xmltodict» python, чтобы сначала преобразовать XML в python dict, после чего я использую функцию python json.dumps (), чтобы преобразовать ее в формат json. Я создал отдельные классы beam.DoFn для каждого шага конвейера лучей.
Я протестировал конвейер на небольшом файле (размером менее 1 МБ), и он работал. Код запускается как на directrunner (менее 1 минуты), так и на обработчике потока данных (5-6 минут, включая запуск и остановку задания потока данных). Но когда я использую обработчик потока данных с большим файлом, например (~ 150 МБ), конвейер продолжает работать почти 1 час без какого-либо прогресса. Я не должен выяснять, что не так.
Я думаю, что проблема заключается в том, чтобы взять весь файл в качестве входных данных в одной строке, и если я смогу каким-то образом лучше прочитать XML-файл из GCS, чтобы проанализировать каждую запись как одну запись, решит эту проблему. Я буду признателен за любую помощь в улучшении этого.
Ниже приведены примеры кодов:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
class ReadGCSfile(beam.DoFn):
def process(self,element):
from apache_beam.io.gcp import gcsio
gcs = gcsio.GcsIO()
yield(gcs.open(element).read())
# this class converts hyphens(-) in the XML text into (_) so that column name convention is inline with #the BigQuery conventions
class clean_xml(beam.DoFn):
def process(self,element):
if re.search('(<.[^\>]*)-', element) == None:
return[element]
else:
for i in range(len(re.findall('(<.[^\>]*)-', element))):
y = re.search('(<.[^\>]*)-', element)
splitpoint1 = y.span()[0]
splitpoint2 = y.span()[1]
element = element[:splitpoint1] + element[splitpoint1:splitpoint2].replace('-','_') + element[splitpoint2:]
return[(element)]
class createdict(beam.DoFn):
def process(self,element):
import xmltodict
order_data = xmltodict.parse(element)
unnested_data = order_data['root1']['root2']
return[(unnested_data)]
class converttojson(beam.DoFn):
def process(self,element):
import json
import re
for order in element:
order_j = json.dumps(order)
yield(order_j)
def run(argv = None):
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'project123'
google_cloud_options.job_name = 'job123'
google_cloud_options.staging_location = 'gs://bucket123/staging'
google_cloud_options.temp_location = 'gs://bucket123/temp'
google_cloud_options.machine_type = 'n1-standard-8'
options.view_as(StandardOptions).runner = 'DataflowRunner'
p = beam.Pipeline(options=options)
input_file_path = 'gs://' + input_bucket +'/'+input_file
(p
| 'Create' >> beam.Create([input_file_path])
| 'GetXML' >> beam.ParDo(ReadGCSfile())
| 'Clean_XML' >> beam.ParDo(clean_xml())
| 'CreateDict' >> beam.ParDo(createdict())
| 'Convert2JSON' >> beam.ParDo(converttojson())
| 'write' >> beam.io.WriteToText('gs://bk/output',file_name_suffix='.json',num_shards =1,shard_name_template='')
)
p.run()
Какие улучшения я могу сделать в этом, чтобы сделать его более эффективным для больших файлов. Сейчас кажется, что конвейер застрял на этапах GetXML и Clean_XML. Как выполнить итерацию по файлу XML по одной записи за раз?
Ниже приведен пример файла данных:
<?xml version="1.0" encoding="UTF-8"?>
<root1 xmlns="http://www.example.com">
<root2 ID-no="000000">
<date>2022-09-23T06:58:24.000Z</date>
<created-by>storefront</created-by>
<original-order-no>000000</original-order-no>
<currency>USD</currency>
<invoice-no>11111111</invoice-no>
<customer>
<customer-name>abcccccc</customer-name>
<customer-email>abccccc@gmail.com</customer-email>
<billing-address>
<address1>20 xyz</address1>
<city>mars</city>
<postal-code>123456</postal-code>
<state-code>hhjbjh</state-code>
<country-code>nm mn</country-code>
</billing-address>
</customer>
<status>
<order-status>NEW</order-status>
<shipping-status>NOT_SHIPPED</shipping-status>
<confirmation-status>CONFIRMED</confirmation-status>
<payment-status>NOT_PAID</payment-status>
</status>
</root2>
</root1>