Запуск пакетного процесса из потокового задания - PullRequest
0 голосов
/ 02 июня 2018

Привет У меня есть проект Maven для обработки потока Flink.На основании сообщения, полученного из потока, я запускаю пакетный процесс, но в настоящее время получаю сообщение об ошибке.

Я довольно новичок в этом мире флинк, и, пожалуйста, дайте мне знать, если у вас есть идеи.Вот код, который я использую для запуска автономного кластера.

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment ( );

    KafkaConsumerService kafkaConsumerService= new KafkaConsumerService();
    FlinkKafkaConsumer010<String> kafkaConsumer = kafkaConsumerService.getKafkaConsumer(settings );
    DataStream<String> messageStream = env.addSource (kafkaConsumer).setParallelism (3);

    messageStream
            .filter(new MyFilter()).setParallelism(3).name("Filter")
            .map(new ProcessFile(arg)).setParallelism(3).name("start batch")
            .addSink(new DiscardingSink()).setParallelism(3).name("DiscardData");

    env.execute("Stream processor");

// Класс карты ProcessFile

    public ProcessFile(String arg) { }

@Override
public String map(String message) throws Exception {
    MessageType typedmessage = ParseMessage(message);
    if (isWhatIwant()) {
        String[] batchArgs = createBatchArgs();
                    Configuration config = new Configuration();
        config.setString(JobManagerOptions.ADDRESS, jobMasterHost);
        config.setInteger(JobManagerOptions.PORT, jobMasterPort);

        StandaloneClusterClient client = new StandaloneClusterClient(config);
        client.setDetached(true);
        PackagedProgram program = new PackagedProgram(new File(jarLocation), SupplyBatchJob.class.getName(), batchArgs);
        client.run(program, 7);
    }

    return typedmessage;
}

Ошибка скопирована с веб-портала диспетчера заданий.Я получаю сообщение об ошибке: org.apache.flink.client.program.ProgramInvocationException: не удалось получить шлюз JobManager.в org.apache.flink.client.program.ClusterClient.runDetached (ClusterClient.java:497) в org.apache.flink.client.program.StandaloneClusterClient.submitJob (StandaloneClusterClient.java:103.f.link.cg)..program.ClusterClient.run (ClusterClient.java:442) в org.apache.flink.client.program.DetachedEnvironment.finalizeExecute (DetachedEnvironment.java:76) в org.apache.flink.client.program.ClusterClient.run (кластер.java: 387) в cw.supply.data.parser.maps.ProcessFileMessage.map (ProcessFileMessage.java:47) в cw.supply.data.parser.maps.ProcessFileMessage.map (ProcessFileMessage.java:25) в орг.apache.flink.streaming.api.operators.StreamMap.processElement (StreamMap.java:41) в org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.pushToOperator (OperatorChain.java:528) в org.ap.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect (OperatorChain.java:503) в org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect (OperatorChain.java:483) в org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOutput.collect (AbstractStreamOperator.java:891) в org.apache.flink.streaming.api.operators.AbstractStreamOperator $CountingOutput.collect (AbstractStreamOperator.java:869) в org.apache.flink.streaming.api.operators.StreamFilter.processElement (StreamFilter.java:40) в org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingO.pushToOperator (OperatorChain.java:528) в org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect (OperatorChain.java:503) в org.apache.flink.streaming.runtime.tasks.OputingChainсобирать (OperatorChain.java:483) в org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOutput.collect (AbstractStreamOperator.java:891) в org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOuttor.собирать (AbstractStreamOperator.java:869) в org.apache.flink.streaming.api.operators.StreamSourceContexts $ NonTimestampContext.collect (StreamSourceContexts.java:103) в org.apache.flink.streaming.api.operators.StreamSourceContexts $ NonTimestampContext.collectWithTimestamp (StreamSourceContexts.jstream:1concon.jpg:1concon.org.org или org.kafka.internals.AbstractFetcher.emitRecordWithTimestamp (AbstractFetcher.java:269) в org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord (Kafka010Fetcher.java:86) ссылка на org.apkafka.internal.Kafka09Fetcher.runFetchLoop (Kafka09Fetcher.java:152) в org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run (FlinkKafkaConsumerBase.java:483) ссылка на org.ap.StreamSource.run (StreamSource.java:87) в org.apache.flink.streaming.api.operators.StreamSource.run (StreamSource.java:55) в org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run (SourceStreamTask.java:95) в org.apache.flink.streaming.runtime.tasks.StreamTask.invoke (StreamTask.java:263) в org.apache.flink.runtime.taskmanager.Task.run (Task.java:702) в java.lang.Thread.run (Thread.java:748). Вызывается: org.apache.flink.util.FlinkException: Не удалось подключиться к ведущему JobManager.Пожалуйста, проверьте, что JobManager работает.в org.apache.flink.client.program.ClusterClient.getJobManagerGateway (ClusterClient.java:789) в org.apache.flink.client.program.ClusterClient.runDetached (ClusterClient.java:495) ... еще 30 причин:org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Не удалось получить шлюз лидера.в org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway (LeaderRetrievalUtils.java:79) в org.apache.flink.client.program.ClusterClient.getJobManagerGateway (ClusterClient.java:784: by: bysed: 31 ... bysed by: by ...java.util.concurrent.TimeoutException: время ожидания фьючерса истекло после [10000 миллисекунд] в scala.concurrent.impl.Promise $ DefaultPromise.ready (Promise.scala: 219) в scala.concurrent.impl.Promise $ DefaultPromise.result (Promise.scala: 223) в scala.concurrent.Await $$ anonfun $ result $ 1.apply (package.scala: 190) в scala.concurrent.BlockContext $ DefaultBlockContext $ .blockOn (BlockContext.scala: 53) в scala.concurrent.Await $.result (package.scala: 190) в scala.concurrent.Await.result (package.scala) в org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway (LeaderRetrievalUtils.java:77) ... еще 32

1 Ответ

0 голосов
/ 04 июня 2018

Я понял, в чем проблема после получения доступа к среде, которую я проверил.Я использовал публичный адрес JobManager, где порт не открыт.Вместо этого я начал использовать частный IP, так как все узлы находятся в одной подсети, и нет необходимости открывать порт для мира.Надеюсь, это поможет кому-то еще.

...