Привет У меня есть проект Maven для обработки потока Flink.На основании сообщения, полученного из потока, я запускаю пакетный процесс, но в настоящее время получаю сообщение об ошибке.
Я довольно новичок в этом мире флинк, и, пожалуйста, дайте мне знать, если у вас есть идеи.Вот код, который я использую для запуска автономного кластера.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment ( );
KafkaConsumerService kafkaConsumerService= new KafkaConsumerService();
FlinkKafkaConsumer010<String> kafkaConsumer = kafkaConsumerService.getKafkaConsumer(settings );
DataStream<String> messageStream = env.addSource (kafkaConsumer).setParallelism (3);
messageStream
.filter(new MyFilter()).setParallelism(3).name("Filter")
.map(new ProcessFile(arg)).setParallelism(3).name("start batch")
.addSink(new DiscardingSink()).setParallelism(3).name("DiscardData");
env.execute("Stream processor");
// Класс карты ProcessFile
public ProcessFile(String arg) { }
@Override
public String map(String message) throws Exception {
MessageType typedmessage = ParseMessage(message);
if (isWhatIwant()) {
String[] batchArgs = createBatchArgs();
Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, jobMasterHost);
config.setInteger(JobManagerOptions.PORT, jobMasterPort);
StandaloneClusterClient client = new StandaloneClusterClient(config);
client.setDetached(true);
PackagedProgram program = new PackagedProgram(new File(jarLocation), SupplyBatchJob.class.getName(), batchArgs);
client.run(program, 7);
}
return typedmessage;
}
Ошибка скопирована с веб-портала диспетчера заданий.Я получаю сообщение об ошибке: org.apache.flink.client.program.ProgramInvocationException: не удалось получить шлюз JobManager.в org.apache.flink.client.program.ClusterClient.runDetached (ClusterClient.java:497) в org.apache.flink.client.program.StandaloneClusterClient.submitJob (StandaloneClusterClient.java:103.f.link.cg)..program.ClusterClient.run (ClusterClient.java:442) в org.apache.flink.client.program.DetachedEnvironment.finalizeExecute (DetachedEnvironment.java:76) в org.apache.flink.client.program.ClusterClient.run (кластер.java: 387) в cw.supply.data.parser.maps.ProcessFileMessage.map (ProcessFileMessage.java:47) в cw.supply.data.parser.maps.ProcessFileMessage.map (ProcessFileMessage.java:25) в орг.apache.flink.streaming.api.operators.StreamMap.processElement (StreamMap.java:41) в org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.pushToOperator (OperatorChain.java:528) в org.ap.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect (OperatorChain.java:503) в org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect (OperatorChain.java:483) в org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOutput.collect (AbstractStreamOperator.java:891) в org.apache.flink.streaming.api.operators.AbstractStreamOperator $CountingOutput.collect (AbstractStreamOperator.java:869) в org.apache.flink.streaming.api.operators.StreamFilter.processElement (StreamFilter.java:40) в org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingO.pushToOperator (OperatorChain.java:528) в org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect (OperatorChain.java:503) в org.apache.flink.streaming.runtime.tasks.OputingChainсобирать (OperatorChain.java:483) в org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOutput.collect (AbstractStreamOperator.java:891) в org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOuttor.собирать (AbstractStreamOperator.java:869) в org.apache.flink.streaming.api.operators.StreamSourceContexts $ NonTimestampContext.collect (StreamSourceContexts.java:103) в org.apache.flink.streaming.api.operators.StreamSourceContexts $ NonTimestampContext.collectWithTimestamp (StreamSourceContexts.jstream:1concon.jpg:1concon.org.org или org.kafka.internals.AbstractFetcher.emitRecordWithTimestamp (AbstractFetcher.java:269) в org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord (Kafka010Fetcher.java:86) ссылка на org.apkafka.internal.Kafka09Fetcher.runFetchLoop (Kafka09Fetcher.java:152) в org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run (FlinkKafkaConsumerBase.java:483) ссылка на org.ap.StreamSource.run (StreamSource.java:87) в org.apache.flink.streaming.api.operators.StreamSource.run (StreamSource.java:55) в org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run (SourceStreamTask.java:95) в org.apache.flink.streaming.runtime.tasks.StreamTask.invoke (StreamTask.java:263) в org.apache.flink.runtime.taskmanager.Task.run (Task.java:702) в java.lang.Thread.run (Thread.java:748). Вызывается: org.apache.flink.util.FlinkException: Не удалось подключиться к ведущему JobManager.Пожалуйста, проверьте, что JobManager работает.в org.apache.flink.client.program.ClusterClient.getJobManagerGateway (ClusterClient.java:789) в org.apache.flink.client.program.ClusterClient.runDetached (ClusterClient.java:495) ... еще 30 причин:org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Не удалось получить шлюз лидера.в org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway (LeaderRetrievalUtils.java:79) в org.apache.flink.client.program.ClusterClient.getJobManagerGateway (ClusterClient.java:784: by: bysed: 31 ... bysed by: by ...java.util.concurrent.TimeoutException: время ожидания фьючерса истекло после [10000 миллисекунд] в scala.concurrent.impl.Promise $ DefaultPromise.ready (Promise.scala: 219) в scala.concurrent.impl.Promise $ DefaultPromise.result (Promise.scala: 223) в scala.concurrent.Await $$ anonfun $ result $ 1.apply (package.scala: 190) в scala.concurrent.BlockContext $ DefaultBlockContext $ .blockOn (BlockContext.scala: 53) в scala.concurrent.Await $.result (package.scala: 190) в scala.concurrent.Await.result (package.scala) в org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway (LeaderRetrievalUtils.java:77) ... еще 32