Управление зависимостями - код конвейера охватывает несколько файлов - PullRequest
0 голосов
/ 07 апреля 2020

У меня возникают проблемы при запуске потокового конвейера на DataFlowRunner после разделения «кода основного конвейера» и «кода пользовательского преобразования» на несколько файлов, как описано здесь: Зависимости нескольких файлов - нет элемент (сообщение pubs) читается в конвейер. Ни одна из вкладок - ЖУРНАЛЫ РАБОТ, ЖУРНАЛИ РАБОТНИКОВ, ОТЧЕТ ОБ ОШИБКЕ РАБОТЫ в (новом) интерфейсе потока данных - не сообщает о любых ошибках. Идентификатор задания: 2020-04-06_15_23_52-4004061030939218807 если кто-то хочет посмотреть ...

Минимальный код конвейера ( ДО ): pipe.py

row = p | "read_sub" >> pubsub.ReadFromPubSub(subscription=SUB,with_attributes=True,) \
        | "add_timestamps" >> beam.Map(add_timestamps)

add_timestamps - это мое пользовательское преобразование

def add_timestamps(e):
    payload = e.data.decode()
    return {"message":payload}

Все отлично работает, когда add_timestamps и код конвейера находятся в одном файле pipeline.py .

ПОСЛЕ Я перестроил файлы следующим образом:

root_dir/
   pipeline.py
   setup.py
   my_transforms/
      __init__py.py
      transforms.py

где, setup.py

import setuptools
setuptools.setup(
   name='my-custom-transforms-package',
   version='1.0',
   install_requires=["datetime"],
   packages= ['my_transforms'] #setuptools.find_packages(),
)

все код преобразования add_timestamps перемещен в transforms.py (в каталоге my_transforms )

В моем pipeline.py я теперь импортирую и используйте преобразование следующим образом:

from my_transforms.transforms import add_timestamps
row = p | "read_sub" >> pubsub.ReadFromPubSub(subscription=SUB,with_attributes=True,) \
        | "add_timestamps" >> beam.Map(add_timestamps)

При запуске трубопровода я устанавливаю флаг: --setup_file=./setup.py.

Однако ни один элемент не считывается в конвейер (как вы можете видеть Водяной знак данных все еще застрял, а добавленные элементы (приблизительные) ничего не сообщают)

Ответы [ 2 ]

1 голос
/ 08 апреля 2020

Я нашел причину root ... Я устанавливал флаг --no_use_public_ips и имел install_requires=["datetime"] в setup.py ..

, конечно, без внешнего IP рабочий не мог связаться с python сервером диспетчера пакетов для установки datetime. решить проблему, не устанавливая флаг --no_use_public_ips (позже я рассмотрю решение, как отключить внешние IP-адреса для рабочих и по-прежнему иметь возможность успешно работать). Было бы хорошо, если бы хоть какое-то сообщение об ошибке отображалось в журналах задания / рабочего! Потратил 2-3 дня на устранение неполадок: =)

1 голос
/ 07 апреля 2020

Я протестировал опцию зависимости нескольких файлов в потоке данных, и у меня она работает нормально. Я воспроизвел пример из Средний .

Ваша структура каталогов правильная. Добавили ли вы какой-либо импорт в файл transforms.py?

Я бы порекомендовал вам внести некоторые изменения в setup.py:

import setuptools

REQUIRED_PACKAGES = [
    ‘datetime’
]

PACKAGE_NAME = 'my_transforms'
PACKAGE_VERSION = '0.0.1'

setuptools.setup(
   name=PACKAGE_NAME,
   version=PACKAGE_VERSION,
   description='My transforms package',
   install_requires=REQUIRED_PACKAGES,
   packages=setuptools.find_packages()
)

При запуске конвейера следите за настройкой следующие поля в PipelineOptions : job_name, project, runner, staging_location, temp_location. Вы должны указать хотя бы один из temp_location или staging_location, чтобы запустить свой конвейер в облаке Google. Если вы используете Apache Beam SDK для Python 2.15.0 или новее, вы также должны указать регион. Не забудьте указать полный путь к setup.py.

Это будет выглядеть примерно так:

python3 pipeline.py \
--job_name <JOB_NAME>
--project <PROJECT_NAME> \
--runner DataflowRunner \
--region <REGION> \
--temp_location gs://<BUCKET_NAME>/temp \
--setup_file /<FULL_PATH>/setup.py

Надеюсь, это поможет.

...