Как Flink обрабатывает водяные знаки с операторами Union? - PullRequest
0 голосов
/ 25 февраля 2020

Я читаю данные из четырех потоков Kinesis. Данные в каждом потоке имеют разные типы данных. После считывания всех четырех потоков я назначаю временные метки и водяные знаки и собираю данные из каждого потока. Все результаты четырех агрегаций выводятся с использованием одного и того же объекта generi c. Я хочу объединить результаты из четырех потоков, чтобы я мог отправить объединенный поток в одну ProcessFunction. По сути, это позволило бы мне использовать ProcessFunction как CoProcessFunction, но я мог бы иметь дело с данными из более чем двух потоков (в этом случае ProcessFunction получит агрегации из всех четырех отдельных потоков).

Однако меня беспокоит то, что это может плохо сочетаться с водяными знаками. Если одному потоку требуется больше времени для обработки или он каким-то образом отстает, его агрегация может не попасть в функцию процесса, если все водяные знаки передаются в объединении и один из потоков опережает другие. Если это так, то водяной знак функции процесса будет максимальным из водяных знаков, которые он видит из четырех отдельных потоков.

Мой вопрос такой: Как обрабатываются водяные знаки в операторах объединения и как обрабатывают ли операторы после объединения эти водяные знаки?

Дополнительно: Если объединение объектов generi c не работает из-за проблем с водяными знаками, каков наилучший способ объединения результатов? из четырех различных агрегаций, когда Flink поддерживает только CoProcessFunction для двух потоков?

Ответы [ 2 ]

1 голос
/ 25 февраля 2020

Другой способ соединить более двух потоков - это построить дерево, которое выполняет попарные соединения, пока все потоки не будут объединены. Либо в виде сбалансированного дерева, например:

A--->
     A+B---->
B--->

            A+B+C+D------------>

C--->
     C+D---->
D--->

, либо путем добавления по одному потоку за раз, например:

a--->
     a+b--->
b--->
            a+b+c--->
     c----->
                     a+b+c+d--->
            d------->

FWIW, FLIP-92 это предложение добавить n-арный оператор потока в Flink, но даже если он будет реализован, он, вероятно, не будет виден пользователю, по крайней мере, на первый взгляд.

1 голос
/ 25 февраля 2020

Водяной знак с Union работает так же, как водяной знак с параллельными потоками. Это означает, что водяной знак всегда является min водяных знаков из всех входных потоков. То же самое относится к последующим операторам, они водяные знаки будут min всех входных потоков.

Если честно, я не думаю, что объединение каким-либо образом зависит от водяных знаков. Но если Вы по какой-либо причине хотите использовать функцию CoProcessFunction, я могу предложить этот несколько хакерский способ. Вы можете создать Seq сгенерированных вами потоков, а затем:

//Streams defined
val seq = Seq(stream, stream2, stream3, stream4)
seq.reduce((stream1, stream2) => stream1.connect(stream2).process(...))
...