мигать с питоном, выполнение задания не удалось - PullRequest
1 голос
/ 06 марта 2019

Для первой попытки я хочу прочитать данные JSON из файла и передать их Flink.Я определил источник (который читает строки JSON строка за строкой) и фильтр-заполнитель.См. Код:

from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.api.common.functions import FilterFunction
import json
import sys

class Json_reader(SourceFunction):
    def readjason(self, ctx):
        sys.stdin = open('capture.json', 'r')
        for line in sys.stdin:
            ctx.collect(json.loads(line))


class Dummy_Filter(FilterFunction):
    def filter(self, value):
        return True

#
# The pipeline definition.
#
def main(factory):
    env = factory.get_execution_environment()
    env.create_python_source(Json_reader()) \
        .filter(Dummy_Filter()) \
        .output()
    env.execute()

Когда я собираю работу и перемещаю ее в свой запущенный Flink-кластер, я получаю следующее сообщение об ошибке:

VirtualBox: / media / sf_Python $./flink-1.7.2/bin/pyflink-stream.sh ./json_parser_flink.py Запуск выполнения программы Не удалось запустить план: null Traceback (последний вызов был последним): файл "", строка 1, в файле "/ tmp/flink_streaming_plan_fbe13c4c-6918-46d4-a4bc-36908a2bea24/json_parser_flink.py ", строка 25, в основном в org.apache.flink.client.program.rest.RestClusterClient.submitJob (RestClusterClient.java) .jj.client.program.ClusterClient.run (ClusterClient.java:487) в org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute (StreamContextEnvironment.java:66) в org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute (StreamExecutionEnvironment.java:1510) в org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute (PythonStreamExecutionEnvironment.java:245) на солнце.reflect.(Method.java:498) org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.client.program.ProgramInvocationException: Задание не выполнено.(JobID: 31615948194c951be03d46576929aa23)

Программа не содержит задание Flink.Возможно, вы забыли вызвать execute () в среде выполнения.

Я не забыл вызвать execute ().

1 Ответ

1 голос
/ 07 марта 2019

Я нашел проблему. Fast ожидает функцию run () в SourceFunction.

...