Я пытаюсь использовать поток данных для выполнения задачи, требующей использования файлов .csv и .json.Из того, что я понимаю, я должен быть в состоянии создать файл setup.py, который будет включать эти файлы и распространять их среди нескольких работников.
Вот как мои файлы выложены:
pipline.py
setup.py
utils /
-->__init__.py
-->**CSV.csv**
-->**JSON.json**
Это мой файл setup.py:
import setuptools
setuptools.setup(name='utils',
version='0.0.1',
description='utils',
packages=setuptools.find_packages(),
package_data={'utils': ['**CSV.csv**', '**JSON.json**']},
include_package_data=True)
Это мой bean.DoFn функции:
class DoWork(beam.DoFn):
def process(self, element):
import pandas as pd
df_csv = pd.read_csv('**CSV.csv**')
df_json = pd.read_json('**JSON.json**')
Do other stuff with dataframes
yield [stuff]
Мой конвейер настроен так:
dataflow_options = ['--job_name=pipline',
'--project=pipeline',
'--temp_location=gs://pipeline/temp',
'--staging_location=gs://pipeline/stage',
'--setup_file=./setup.py']
options = PipelineOptions(dataflow_options)
gcloud_options = options.view_as(GoogleCloudOptions)
options.view_as(StandardOptions).runner = 'DataflowRunner'
with beam.Pipeline(options=options) as p:
update = p | beam.Create(files) | beam.ParDo(DoWork())
По сути, я продолжаю получать:
IOError: File CSV.csv does not exist
Он не думает, что файл .json тоже существует, но просто выдает ошибку, прежде чем достигнет этого шага.Возможно, файлы не попадают в поток данных, или я неправильно ссылаюсь на них в DoFn.Должен ли я на самом деле помещать файлы в параметр data_files функции установки вместо package_data?