У меня возникают проблемы при запуске потокового конвейера на DataFlowRunner
после разделения «кода основного конвейера» и «кода пользовательского преобразования» на несколько файлов, как описано здесь: Зависимости нескольких файлов - нет элемент (сообщение pubs) читается в конвейер. Ни одна из вкладок - ЖУРНАЛЫ РАБОТ, ЖУРНАЛИ РАБОТНИКОВ, ОТЧЕТ ОБ ОШИБКЕ РАБОТЫ в (новом) интерфейсе потока данных - не сообщает о любых ошибках. Идентификатор задания: 2020-04-06_15_23_52-4004061030939218807
если кто-то хочет посмотреть ...
Минимальный код конвейера ( ДО ): pipe.py
row = p | "read_sub" >> pubsub.ReadFromPubSub(subscription=SUB,with_attributes=True,) \
| "add_timestamps" >> beam.Map(add_timestamps)
add_timestamps
- это мое пользовательское преобразование
def add_timestamps(e):
payload = e.data.decode()
return {"message":payload}
Все отлично работает, когда add_timestamps
и код конвейера находятся в одном файле pipeline.py .
ПОСЛЕ Я перестроил файлы следующим образом:
root_dir/
pipeline.py
setup.py
my_transforms/
__init__py.py
transforms.py
где, setup.py
import setuptools
setuptools.setup(
name='my-custom-transforms-package',
version='1.0',
install_requires=["datetime"],
packages= ['my_transforms'] #setuptools.find_packages(),
)
все код преобразования add_timestamps
перемещен в transforms.py (в каталоге my_transforms )
В моем pipeline.py я теперь импортирую и используйте преобразование следующим образом:
from my_transforms.transforms import add_timestamps
row = p | "read_sub" >> pubsub.ReadFromPubSub(subscription=SUB,with_attributes=True,) \
| "add_timestamps" >> beam.Map(add_timestamps)
При запуске трубопровода я устанавливаю флаг: --setup_file=./setup.py
.
Однако ни один элемент не считывается в конвейер (как вы можете видеть Водяной знак данных все еще застрял, а добавленные элементы (приблизительные) ничего не сообщают)