Присоединение файла данных (.csv, .json) как часть пакета установки, который будет использоваться в потоке данных - PullRequest
0 голосов
/ 28 ноября 2018

Я пытаюсь использовать поток данных для выполнения задачи, требующей использования файлов .csv и .json.Из того, что я понимаю, я должен быть в состоянии создать файл setup.py, который будет включать эти файлы и распространять их среди нескольких работников.

Вот как мои файлы выложены:

pipline.py
setup.py
utils /
  -->__init__.py
  -->**CSV.csv**
  -->**JSON.json**

Это мой файл setup.py:

import setuptools

setuptools.setup(name='utils',
                 version='0.0.1',
                 description='utils',
                 packages=setuptools.find_packages(),
                 package_data={'utils': ['**CSV.csv**', '**JSON.json**']},
                 include_package_data=True)

Это мой bean.DoFn функции:

class DoWork(beam.DoFn):
    def process(self, element):

        import pandas as pd

        df_csv = pd.read_csv('**CSV.csv**')
        df_json = pd.read_json('**JSON.json**')

        Do other stuff with dataframes

        yield [stuff]

Мой конвейер настроен так:

dataflow_options = ['--job_name=pipline',
                    '--project=pipeline',
                    '--temp_location=gs://pipeline/temp',
                    '--staging_location=gs://pipeline/stage',
                    '--setup_file=./setup.py']

options = PipelineOptions(dataflow_options)
gcloud_options = options.view_as(GoogleCloudOptions)
options.view_as(StandardOptions).runner = 'DataflowRunner'

with beam.Pipeline(options=options) as p:
    update = p | beam.Create(files) | beam.ParDo(DoWork())

По сути, я продолжаю получать:

IOError: File CSV.csv does not exist

Он не думает, что файл .json тоже существует, но просто выдает ошибку, прежде чем достигнет этого шага.Возможно, файлы не попадают в поток данных, или я неправильно ссылаюсь на них в DoFn.Должен ли я на самом деле помещать файлы в параметр data_files функции установки вместо package_data?

1 Ответ

0 голосов
/ 28 ноября 2018

вам нужно загрузить входные файлы в формате GS и указать местоположение GS, а не CSV .Я думаю, что вы запустили код локально, имея CSV-файл в том же каталоге, что и код.Но для его запуска с использованием DataflowRunner понадобятся файлы в gs.

...