Apache Flink, второй этап суммирования на оконном потоке - PullRequest
0 голосов
/ 04 июля 2018

У меня есть события DataStream of Kafka (соответствующие показаниям с устройства), поданные в следующий код, который выдает среднее значение по каждому устройству для чтения в скользящем окне. Это отлично работает. Затем я хочу вычислить сумму всех средних значений для каждого устройства в одном и том же окне, это та часть, которую я не могу синтаксически правильно выразить.

Эта часть работает:

val stream = env
    // words is our Kafka topic
    .addSource(kafkaConsumer)
    // configure timestamp and watermark assigner
    .assignTimestampsAndWatermarks(new DeviceTSAssigner)
      .keyBy(_.deviceIdFull)
      .timeWindow(Time.minutes(5), Time.minutes(1))
    /* count events in window */
      .apply{ (key: String, window: TimeWindow, events: Iterable[DeviceData], out: Collector[(String, Long, Double)]) =>
        out.collect( (key, window.getEnd, events.map(_.currentReading).sum/events.size))
    }

  stream.print()

Вывод что-то вроде

(device1,1530681420000,0.0)
(device2,1530681420000,0.0)
(device3,1530681480000,0.0)
(device4,1530681480000,0.0)
(device5,1530681480000,52066.0)
(device6,1530681480000,69039.0)
(device7,1530681480000,79939.0)
... 
...

Следующий код - часть, с которой у меня проблемы, я не уверен, как именно это кодировать, но я думаю, что это должно быть примерно так:

  val avgStream = stream
    .keyBy(2) // 2 represents the window.End from stream, see code above
    .timeWindow(Time.minutes(1)) // tumbling window
    .apply { (
               key: Long,
               window: TimeWindow,
               events: Iterable[(String, Long, Double)],
               out: Collector[(Long, Double)]) =>
      out.collect( (key, events.map( _._3 ).sum ))
    }

Я получаю следующие ошибки при компиляции этого кода ..

Error:(70, 52) type mismatch;
 found   : (Long, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[(String, Long, Double)], org.apache.flink.util.Collector[(Long, Double)]) => Unit
 required: (org.apache.flink.api.java.tuple.Tuple, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[(String, Long, Double)], org.apache.flink.util.Collector[?]) => Unit
                   out: Collector[(Long, Double)]) =>

Я пробовал и другие варианты, такие как использование AggregtionFunctions, но не смог пройти компиляцию. Судя по ошибке, мне нужно преобразовать элементы входного потока в кортежи, я видел некоторый код, но не совсем уверен, как это сделать. Я совершенно новичок в Scala, поэтому я думаю, что это главная проблема, потому что то, что я хочу сделать, не сложно.

Обновлено 07.04 2018

Я думаю, что у меня есть решение для моего вопроса, кажется, работает нормально, но я все еще хотел бы держать это открытым, надеясь, что кто-то еще может прокомментировать его (вопрос, а также мое решение).

По сути, я удалил первое поле (выполнив карту), которое было названием устройства, поскольку оно нам не нужно, а затем keyBy на отметке времени (из предыдущего этапа), оконные события в падающее окно и затем просто сложите 2-е поле (на основе индекса 1, 0), которое является средним показателем предыдущего этапа.

val avgStream = stream
          .map(r => (r._2, r._3))
         .keyBy(0)
         .timeWindowAll(Time.minutes(1))
         .sum(1)
         .print()

1 Ответ

0 голосов
/ 05 июля 2018

Я смог ответить на свой собственный вопрос, поэтому подход, описанный выше (см. Обновление от 07/04/2018), работает, но лучший способ сделать это (особенно, если вы хотите сделать это не только для одного поля в поток, но несколько) должен использовать функцию AggregateFunction. Я пытался сделать это и раньше, но столкнулся с проблемами из-за отсутствия шага «карта».

Как только я отобразил поток на втором этапе, чтобы извлечь соответствующие области интереса, я мог бы использовать функцию AggregateFunction.

Документация Flink здесь и эта ссылка на github оба служат примером для этого. Я начал с примера документации Flink, потому что его было очень просто понять, а затем преобразовал мой код, чтобы он выглядел больше как пример github.

...