Разделение и объединение потоков в Apache Flink - PullRequest
0 голосов
/ 02 ноября 2018

Я думаю, что у меня довольно нестандартный вариант использования. Я хочу разделить мой исходный поток на несколько потоков с помощью функции filter:

val dataStream:DataStream[MyEvent] = ...
val s1 = dataStream.filter(...).map(...)
val s2 = dataStream.filter(...).map(...)

У меня также есть экстрактор метки времени (к входящим событиям будет прикреплена метка времени в XML):

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
...

dataStream.assignTimestampsAndWatermarks(new MyTimestampExtractor)
...

class MyTimestampExtractor extends AssignerWithPunctuatedWatermarks[Elem]
{
  override def checkAndGetNextWatermark(lastElement:Elem, extractedTimestamp:Long):Watermark = new Watermark(extractedTimestamp)
  override def extractTimestamp(element:Elem, previousElementTimestamp:Long):Long = XmlOperations.getDateTime(element, "@timestamp").getMillis
}

Я выбрал этот подход с тех пор, а не просто делаю один поток (val s = dataStream.filter(...).map(...).filter(...).map(...)), потому что я хочу построить сеть , которая разделяет / объединяет произвольные потоки (например, s1 + s2-> c1, s1 + s3-> c2, c2 + s4-> c3, ...)

Теперь при отправке событий через вышеприведенный пример может случиться так, что событие E1 заканчивается как в s1, так и в s2. В моем понимании это означает, что то же самое событие E1 помещается в качестве первого экземпляра в s1 (E1a), а также в качестве второго экземпляра в s2 (E1b).

Итак, что я хочу сделать сейчас, это объединить E1a и E1b в объединенный E1, который напоминает E1, который является одновременно преобразованием s1 и s2.

Я пытался:

val c1 = s1.join(s2)
  .where(_.key).equalTo(_.key)
  .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  .apply((e1a, e2b) => { printf("Got e1a and e1b"); e1a })

Однако кажется, что события никогда не достигают функции применения, и я не могу понять, почему.

Что не так в моем примере? Будет ли мой подход / идея сети таких потоков работать вообще?

1 Ответ

0 голосов
/ 02 ноября 2018

Вы договорились, чтобы там были водяные знаки? При работе со временем события окно будет запускаться только при поступлении водяного знака, который опережает часы времени события за концом окна. Вы делаете это с помощью экстрактора меток времени / генератора водяных знаков; см. пример из документации для получения более подробной информации.

Если один из потоков иногда простаивает, что также может вызвать проблемы, так как отсутствие водяных знаков в свободном потоке будет удерживать водяные знаки для любых потоков, к которым он подключен.

В зависимости от того, что именно вы пытаетесь сделать, вам может оказаться проще использовать функцию CoProcessFunction, чем объединение с временным окном. Для примера ознакомьтесь с упражнениями по обогащению с сохранением состояния и по истечении срока действия с сайта Flink.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...