Образец флинка становится проблемой с Arralist в коде предупреждения? - PullRequest
0 голосов
/ 27 февраля 2019

Я следовал этому примеру и реализовал с такими же образцами данных kafka json.

Данные потребительских образцов {"temperature" : 28,"machineName":"xyz"}

DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern)
        .flatSelect(new PatternFlatSelectFunction<TemperatureEvent, Alert>() {
            private static final long serialVersionUID = 1L;


        @Override
        public void flatSelect(Map<String, List<TemperatureEvent>> event, Collector<Alert> out) throws Exception {
            new Alert("Temperature Rise Detected:" + ((TemperatureEvent) event.get("first")).getTemperature()
                    + " on machine name:" + ((MonitoringEvent) event.get("first")).getMachineName());

        }

Теперь у меня проблема с ArrayListcast

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at Test.KafkaApp.main(KafkaApp.java:61)

Причина: java.lang.ClassCastException: java.util.ArrayList не может быть приведен к Test.TemperaEvent в Test.KafkaApp $ 2.flatSelect (KafkaApp.java:53) в org.apache.flink.cep.operator.processElement (AbstractKeyedCEPPatternOperator.java:198) в org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput (StreamInputProcessor.java:202) в org.apache.flink.streaming.runtime.tasks.OneInputStunask.Java: 105) в org.apache.flink.streaming.runtime.tasks.StreamTask.invoke (StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run (Task.java:704) в java.lang.Thread.run (неизвестный источник)

1 Ответ

0 голосов
/ 28 февраля 2019

Ваш код содержит две проблемы:

  • Прежде всего flatSelect получает Map<String, List<TemperatureEvent>>.Это означает, что вы можете получить несколько TemperatureEvents за шаблон.Таким образом, вы должны выбрать, какой вы хотите.
  • Вы не добавляете Alerts к Collector<Alert>.Функция плоской карты не возвращает значения, но выводит их через Collector<Alert>

Без компиляции, я думаю, что это должно сработать

DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern)
    .flatSelect(
    new PatternFlatSelectFunction<TemperatureEvent, Alert>() {
            private static final long serialVersionUID = 1L;

        @Override
        public void flatSelect(Map<String, List<TemperatureEvent>> event, Collector<Alert> out) throws Exception {
            TemperatureEvent temperatureEvent = event.get("first").get(0);
            out.collect(new Alert("Temperature Rise Detected:" + temperatureEvent.getTemperature() + " on machine name:" + temperatureEvent.getMachineName()));
        }
       });

Кстати, связанныйкод из репозитория O'Reilly не скомпилируется с Flink.PatternSelectFunction имеет неправильную подпись.

...