Apache Flink: коннектор Kafka в потоковом API Python, «Не удается загрузить класс пользователя» - PullRequest
0 голосов
/ 10 октября 2018

Я испытываю новый потоковый API-интерфейс от Flink и пытаюсь запустить мой скрипт с ./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py.Сценарий python довольно прост, я просто пытаюсь извлечь из существующей темы и отправить все на стандартный вывод (или файл * .out в каталоге журнала, где метод вывода по умолчанию генерирует данные).

import glob
import os
import sys
from java.util import Properties
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.streaming.api.collector.selector import OutputSelector
from org.apache.flink.api.common.serialization import SimpleStringSchema

directories=['/home/user/flink/flink-1.6.1/lib']
for directory in directories:
    for jar in glob.glob(os.path.join(directory,'*.jar')):
                sys.path.append(jar)

from org.apache.flink.streaming.connectors.kafka import FlinkKafkaConsumer09

props = Properties()
config = {"bootstrap_servers": "localhost:9092",
          "group_id": "flink_test",
          "topics": ["TopicCategory-TopicName"]}
props.setProperty("bootstrap.servers", config['bootstrap_servers'])
props.setProperty("group_id", config['group_id'])
props.setProperty("zookeeper.connect", "localhost:2181")

def main(factory):
    consumer = FlinkKafkaConsumer09([config["topics"]], SimpleStringSchema(), props)

    env = factory.get_execution_environment()
    env.add_java_source(consumer) \
        .output()
    env.execute()

Я взял несколько файлов jar из репозиториев Maven, а именно flink-connector-kafka-0.9_2.11-1.6.1.jar, flink-connector-kafka-base_2.11-1.6.1.jar и kafka-clients-0.9.0.1.jar, и скопировал их в каталог Flink lib.Если я не неправильно понял документацию, этого должно хватить, чтобы Flink загрузил разъем kafka.Действительно, если я удаляю любую из этих банок, импорт завершается неудачно, но этого, похоже, недостаточно, чтобы фактически вызвать план.Добавление цикла for для динамического добавления их в sys.path также не сработало.Вот что печатается в консоли:

Starting execution of program
Failed to run plan: null
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/tmp/flink_streaming_plan_9cfed4d9-0288-429c-99ac-df02c86922ec/read_from_kafka.py", line 32, in main
    at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
    at org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)

org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: bbcc0cb2c4fe6e3012d228b06b270eba)

The program didn't contain a Flink job. Perhaps you forgot to call execute() on the execution environment.

Вот что я вижу в журналах:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class:    org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
ClassLoader info: URL ClassLoader:
    file: '/tmp/blobStore-9f6930fa-f1cf-4851-a0bf-2e620391596f/job_ca486746e7feb42d2d162026b74e9935/blob_p-9321896d165fec27a617d44ad50e3ef09c3211d9-405ccc9b490fa1e1348f0a76b1a48887' (valid JAR)
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)

Есть ли способ исправить это и сделать соединитель доступным для Python?Я подозреваю, что это проблема Classloader с Jython, но я не знаю, как исследовать дальше (также учитывая, что я не знаю Java).Большое спасибо.

Ответы [ 2 ]

0 голосов
/ 28 июня 2019

У меня в гостевом файле jar может быть встроенный импорт или зависимости, поэтому трех файлов jar недостаточно. Что касается того, как узнать java-зависимые отношения jar, это то, что делает java maven.Вы можете увидеть официальный сайт "Настройка сборки проекта" для помощи.В моем случае я следую официальной настройке Java-проекта, использую "from org.apache.flink.streaming.connectors.kafka import FlinkKafkaConsumer" и добавляю зависимость "org.apache.flink
flink-clients_2.11
1.8.0 "в pom.xml, тогда я могу выводить записи kafka в стандартный вывод с помощью Python API.

0 голосов
/ 11 октября 2018

Вы используете не того потребителя Kafka здесь.В вашем коде это FlinkKafkaConsumer09, но используемая вами библиотека - flink-connector-kafka-0.11_2.11-1.6.1.jar, что для FlinkKafkaConsumer011.Попробуйте заменить FlinkKafkaConsumer09 на FlinkKafkaConsumer011 или использовать файл lib flink-connector-kafka-0.9_2.11-1.6.1.jar вместо текущего.

...