Я определил приложение Flink, которое читает из 2 входных потоков, объединяет их и агрегирует события объединенного потока:
// Define first input stream and assign timestamp and watermark for that stream
DataStream<InputEvent> pendingTasks = env.fromCollection(..);
pendingTasks = pendingTasks.assignTimestampAndWatermarks(new EventTimeAndWatermarks());
// Define second input stream
DataStream<InputEvent> completedTasks = env.fromCollection(..);
completedTasks = completedTasks.assignTimestampAndWatermarks(new EventTimeAndWatermarks());
// Define the aggregation logic
DataStream<OutputEvent> stream = pendingTasks.union(completedTasks)
.keyBy(new TaskOwnerSelector())
.process(new ProcessingFunctionAudit())
Моя реализация ProcessingFunctionAudit использует кольцевой буфер для хранения событий. Например, кольцевой буфер может хранить события, которые имеют время события в (х, х + 2 часа). Это означает, что события, поступающие из обоих потоков, должны иметь время события в периоде, охватываемом кольцевым буфером, в противном случае я не могу сохранить их, или мне нужно свернуть кольцевой буфер.
Проблема в том, что Flink, похоже, не ограничивает разницу во времени между событием, поступающим из первого входного потока, и событиями, поступающими из второго входного потока. Возможно, что моя функция получает события из первого потока, которые находятся далеко впереди (во времени события) последнего события, полученного от 2-го потока, что заставляет реализацию выполнить откат кольцевого буфера (для сохранения события), но в будущем буфер будет слишком далеко впереди, когда события из 2-го потока будут доставлены в мою функцию.
Я ищу механизм для ограничения разницы во времени между событиями, поступающими из каждого потока. Если один поток не генерирует события в течение некоторого времени, в идеале Flink не должен доставлять события из другого потока, если они будут длиться более X минут после последнего события, доставленного из другого потока.
В любом случае бесполезно доставлять эти события, так как водяной знак оператора не будет двигаться вперед, пока другой поток не догонит, а движение водяного знака - это то, что запускает оценку состояния в моей реализации.