События Flink CEP, присоединенные к обратному потоку данных - PullRequest
0 голосов
/ 25 октября 2018

У меня есть 2 потока данных (например)

ts | device | custId | temp
1 | 'device1'| 1 | 10
1 | 'device2'| 4 | 7
2 | 'device1'| 1 | 10
3 | 'device1'| 1 | 10
4 | 'device1'| 1 | 10
5 | 'device2'| 4 | 10

Я создал шаблон CEP, в котором я хочу проверить, не превышает ли температура в течение 4 секунд значение 30.

val pattern = Pattern.begin[Device]("start")
      .where(_.sumtemp >= 30)
      .within(Time.seconds(4))

Есть ли способ соединить выходные данные этого потока шаблона с другим входящим потоком данных, чтобы получить ниже?

ts | custId | morethanthiry
1 | 1 | yes
2 | 4 | no

Буду очень признателен, если вы поделитесь примером для этого.

1 Ответ

0 голосов
/ 25 октября 2018

Существует более одного варианта.Вы можете объединить свои потоки с помощью coGroup

Пример:

set1.coGroup(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyCoGroupFunction());

Вы можете рассматривать это как объединение в SQL.

Небольшой пример для реализации:

class MyCoGroupFunction extends RichCoGroupFunction[DataTypeOfStream1, DataTypeOfStream2, DataTypeOfOutput] {

      override def coGroup(first: DataTypeOfStream1,
                         second: DataTypeOfStream2],
                         out: DataTypeOfOutput): Unit = {

           out.collect(...)
           //your output

      }
}

При необходимости вы также можете использовать состояние.

Существуют также другие варианты объединения двух потоков, например

  • union (еслиподключенные потоки имеют одинаковый тип данных)
  • connect
  • coFlatMap Различия между методами незначительны.

См. https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/ для получения дополнительной информации.

...