Apache Flink Python Проблема с зависимостями UDF в API таблиц - PullRequest
1 голос
/ 05 мая 2020

После запуска Python Задания API таблиц, которое включает определяемые пользователем функции (UDF), отправляя его в локальный кластер, происходит сбой py4j.protocol.Py4JJavaError , вызванный

java .util.ServiceConfigurationError: org. apache .beam.sdk.options.PipelineOptionsRegistrar: org. apache .beam.sdk.options.DefaultPipelineOptionsRegistrar не является подтипом .

Я знаю, что это ошибка, связанная с зависимостями от пути к библиотеке / загрузки классов. Я уже пробовал следовать всем инструкциям по следующей ссылке: https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/debugging_classloading.html

Я перепробовал множество различных конфигураций с параметром конфигурации classloader.parent-first-patterns-additional. Различные записи с org.apache.beam.sdk.[...] привели к различным дополнительным сообщениям об ошибках.

Следующие зависимости, которые относятся к apache лучу, находятся на пути lib:

  • beam-model-fn-execution-2.20.jar
  • балка-модель-управление-заданиями-2.20.jar
  • балка-модель-конвейер-2.20.jar
  • балка-бегуны-строительство-сердечник- java -2.20.jar
  • балки-бегуны- java -fn-execution-2.20.jar
  • балка-sdks- java -core-2.20.jar
  • балка-sdks- java -fn-execution-2.20.jar
  • beam-vendor-grp c -1_21_0-0.1.jar
  • beam-vendor-grp c -1_26_0.0.3.jar
  • луч-поставщик-гуава-26_0-jre-0.1.jar
  • луч-вендор-sdks- java -расширения-protobuf-2.20.jar

I также могу исключить, что это связано с моим кодом, поскольку я тестировал следующий пример кода веб-сайта проекта: https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)

add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())

t_env.register_function("add", add)

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('a', DataTypes.BIGINT())
                 .field('b', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('a', DataTypes.BIGINT())
                 .field('b', DataTypes.BIGINT())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field('sum', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('sum', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

t_env.from_path('mySource')\
    .select("add(a, b)") \
    .insert_into('mySink')

t_env.execute("tutorial_job")

При выполнении этого кода то же самое появляется сообщение об ошибке.

Есть ли у кого-нибудь описание конфигурации кластера Flink, который может запускать Python задания API таблиц с UD F? Большое спасибо за все советы заранее!

1 Ответ

0 голосов
/ 20 мая 2020

Проблема решена новой версией 1.10.1 of Apache Flink. Выполнение примера сценария, показанного в вопросе, теперь возможно через двоичные файлы с помощью команды run -py path/to/script без каких-либо проблем.

Что касается зависимостей, они уже включены в уже поставленный flink_table_x.xx-1.10.1.jar. Таким образом, не нужно добавлять никаких дополнительных зависимостей к lib-path, что было сделано в вопросе при попытке отладки / настройки.

...