Запуск нескольких вариантов использования потоковых данных - PullRequest
0 голосов
/ 25 мая 2020

Я получаю потоковые данные датчиков от Kafka, и мне нужно сделать следующее:

a. Проверьте изменения переменной в течение периода времени и, если они экстремальные, поднимите тревогу (например, колебания температуры в течение 5-минутного периода от очень низкого до очень высокого). Я проверяю, очень ли у меня температура, устанавливаю таймер на 5 минут и наблюдаю, получаю ли я очень высокую температуру в течение 5 минут, и если да, я поднимаю будильник

b. В качестве альтернативы, если у меня высокая температура, я проверяю, получаю ли я очень низкую температуру в 5-минутном окне

c. вычислить текущее среднее значение температуры каждые 1 мин и pu sh to kafka. Это непрерывная деятельность, и мне нужен таймер с самого начала, который запускается каждые 1 минуту на инкрементной основе

Я не могу понять, как использовать таймер для всех этих вариантов использования в тот же код. Любые предложения / советы.

Ответы [ 2 ]

1 голос
/ 25 мая 2020
Время вращения

Flink windows хорошо подходит для случая c, но не так хорошо подходит для случаев a и b. Это связано с тем, что эти windows не могут быть выровнены по запускающему событию - вместо этого они всегда выровнены по часам (например, с 12:00 до 12:05). Таким образом, если событие высокой температуры происходит в 12:04, а затем сильно падает к 12:06, эти два события будут разными windows.

Для a и b я бы вместо этого предложил использовать интервальное соединение из DataStream API или соединение с временным окном с использованием Table или SQL API . Что-то вроде

SELECT *
FROM events e1, events e2
WHERE e1.id = e2.id AND
      e2.time BETWEEN e1.time AND e1.time + INTERVAL '5' MINUTE AND
      ABS(e1.temp - e2.temp) > 50

Если вам действительно нужно объединить эти три случая вместе, я бы предложил использовать KeyedProcessFunction - но это будет намного сложнее, особенно если вам нужно беспокоиться о событиях. прибытие не по порядку. И вам понадобится более одного таймера, поскольку минутные интервалы не синхронизируются c с 5-минутными.

0 голосов
/ 25 мая 2020

Итак, Flink имеет концепцию Windows Подробнее здесь .

Существует также концепция потоковых и неключевых потоков, которые также помогут вам, если вы хотите разделить данные перед обработкой Подробнее здесь .

  • Для a & b -> я предлагаю вам использовать простой 5-минутный интервал TumblingEventTimeWindows и написать свой собственный logi c в методе process. Итак, каждые 5 минут вы обрабатываете элементы вместе, что вы в конечном итоге и хотите, я думаю.
  • Для c -> Я снова предлагаю использовать простой TumblingEventTimeWindows 1 минуту и ​​рассчитывать среднюю температуру, используя reduce метод

В приведенном ниже коде предполагается, что поток равен non-keyed с ProcessingTime TimeCharacteristi c.


        // By Default TimeCharacteristic is ProcessingTime
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        // for a & b 
        data
                .windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
                .process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
                    @Override
                    public void process(Context context, Iterable<String> iterable, Collector<String> collector) throws Exception {
                        for(String elements: iterable){
                            // custom logic here

                                ....
                            // collect your result
                            collector.collect(elements);
                        }
                    }
                })
                .addSink(Push it to whatever Sink);

        // For c
        data
                .windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))
                .reduce(new ReduceFunction<String>() {
                    @Override
                    public String reduce(String s, String t1) throws Exception {
                        // Calculate Avg
                        return avg;
                    }
                })
                // Add to Kafka
                .addSink(new FlinkKafkaProducer<>(topic,new SimpleStringSchema,kafkaProperties);






...