Я думаю, что у меня довольно нестандартный вариант использования. Я хочу разделить мой исходный поток на несколько потоков с помощью функции filter
:
val dataStream:DataStream[MyEvent] = ...
val s1 = dataStream.filter(...).map(...)
val s2 = dataStream.filter(...).map(...)
У меня также есть экстрактор метки времени (к входящим событиям будет прикреплена метка времени в XML):
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
...
dataStream.assignTimestampsAndWatermarks(new MyTimestampExtractor)
...
class MyTimestampExtractor extends AssignerWithPunctuatedWatermarks[Elem]
{
override def checkAndGetNextWatermark(lastElement:Elem, extractedTimestamp:Long):Watermark = new Watermark(extractedTimestamp)
override def extractTimestamp(element:Elem, previousElementTimestamp:Long):Long = XmlOperations.getDateTime(element, "@timestamp").getMillis
}
Я выбрал этот подход с тех пор, а не просто делаю один поток (val s = dataStream.filter(...).map(...).filter(...).map(...)
), потому что я хочу построить сеть , которая разделяет / объединяет произвольные потоки (например, s1 + s2-> c1, s1 + s3-> c2, c2 + s4-> c3, ...)
Теперь при отправке событий через вышеприведенный пример может случиться так, что событие E1 заканчивается как в s1, так и в s2. В моем понимании это означает, что то же самое событие E1 помещается в качестве первого экземпляра в s1 (E1a), а также в качестве второго экземпляра в s2 (E1b).
Итак, что я хочу сделать сейчас, это объединить E1a и E1b в объединенный E1, который напоминает E1, который является одновременно преобразованием s1 и s2.
Я пытался:
val c1 = s1.join(s2)
.where(_.key).equalTo(_.key)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply((e1a, e2b) => { printf("Got e1a and e1b"); e1a })
Однако кажется, что события никогда не достигают функции применения, и я не могу понять, почему.
Что не так в моем примере? Будет ли мой подход / идея сети таких потоков работать вообще?