Я испытываю новый потоковый API-интерфейс от Flink и пытаюсь запустить мой скрипт с ./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py
.Сценарий python довольно прост, я просто пытаюсь извлечь из существующей темы и отправить все на стандартный вывод (или файл * .out в каталоге журнала, где метод вывода по умолчанию генерирует данные).
import glob
import os
import sys
from java.util import Properties
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.streaming.api.collector.selector import OutputSelector
from org.apache.flink.api.common.serialization import SimpleStringSchema
directories=['/home/user/flink/flink-1.6.1/lib']
for directory in directories:
for jar in glob.glob(os.path.join(directory,'*.jar')):
sys.path.append(jar)
from org.apache.flink.streaming.connectors.kafka import FlinkKafkaConsumer09
props = Properties()
config = {"bootstrap_servers": "localhost:9092",
"group_id": "flink_test",
"topics": ["TopicCategory-TopicName"]}
props.setProperty("bootstrap.servers", config['bootstrap_servers'])
props.setProperty("group_id", config['group_id'])
props.setProperty("zookeeper.connect", "localhost:2181")
def main(factory):
consumer = FlinkKafkaConsumer09([config["topics"]], SimpleStringSchema(), props)
env = factory.get_execution_environment()
env.add_java_source(consumer) \
.output()
env.execute()
Я взял несколько файлов jar из репозиториев Maven, а именно flink-connector-kafka-0.9_2.11-1.6.1.jar
, flink-connector-kafka-base_2.11-1.6.1.jar
и kafka-clients-0.9.0.1.jar
, и скопировал их в каталог Flink lib
.Если я не неправильно понял документацию, этого должно хватить, чтобы Flink загрузил разъем kafka.Действительно, если я удаляю любую из этих банок, импорт завершается неудачно, но этого, похоже, недостаточно, чтобы фактически вызвать план.Добавление цикла for для динамического добавления их в sys.path
также не сработало.Вот что печатается в консоли:
Starting execution of program
Failed to run plan: null
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/tmp/flink_streaming_plan_9cfed4d9-0288-429c-99ac-df02c86922ec/read_from_kafka.py", line 32, in main
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
at org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: bbcc0cb2c4fe6e3012d228b06b270eba)
The program didn't contain a Flink job. Perhaps you forgot to call execute() on the execution environment.
Вот что я вижу в журналах:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
ClassLoader info: URL ClassLoader:
file: '/tmp/blobStore-9f6930fa-f1cf-4851-a0bf-2e620391596f/job_ca486746e7feb42d2d162026b74e9935/blob_p-9321896d165fec27a617d44ad50e3ef09c3211d9-405ccc9b490fa1e1348f0a76b1a48887' (valid JAR)
Class not resolvable through given classloader.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Есть ли способ исправить это и сделать соединитель доступным для Python?Я подозреваю, что это проблема Classloader с Jython, но я не знаю, как исследовать дальше (также учитывая, что я не знаю Java).Большое спасибо.