Как инициализировать состояние вещания Flink? - PullRequest
0 голосов
/ 18 февраля 2019

Мы пытаемся создать сценарий использования, в котором данные из потока проходят через формулу вычисления, но сама формула также должна (редко) обновляться.Из прочтения документации мне кажется, что состояние вещания Flink будет естественным для такого случая.

В качестве эксперимента я создал упрощенную версию: предположим, у меня есть поток целых чисел и второй поток, содержащий коэффициенты умножения для этих целых чисел (куда я могу отправлять значения по желанию).Второй поток очень низкой частоты, может легко быть в порядке дней или недель между событиями.На данный момент они оба реализованы как простые сокет-серверы, конечный продукт будет использовать Kafka.

В моем примере приложения все это работает, но у меня остается одна проблема: что происходит, когда система запускается и ничего не имеетеще не было в эфире?Откуда я могу получить коэффициент по умолчанию (или последний использованный)?В моем примере я решил это путем жесткого кодирования значения, но это не то, что я мог бы использовать.

В моем экспериментальном проекте я немного озадачен этим, поскольку {processElement} получает только состояние широковещательной передачи только для чтения, но processBroadcastElement не будет вызываться до тех пор, пока не появится обновление, которое может занять много времени.время.Мой план состоял в том, чтобы сохранить формулы, используемые в базе данных, и каким-то образом прочитать их, когда задание (пере) начнется, но я не нашел способа сделать эту работу.Приветствуются любые предложения от более знающих людей, это мой первый проект Flink, поэтому я пытаюсь найти свой путь.

Рабочий пример здесь: https://github.com/tonvanbart/flink-broadcast-example/tree/mapstate-attempt Код Flink находится вкласс BroadcastState.

Заранее спасибо.

1 Ответ

0 голосов
/ 19 февраля 2019

Если система перезапускается с контрольной точки / точки сохранения, то у вас есть последний фактор, который был передан (через состояние), верно?Поэтому я предполагаю, что проблема заключается в том, что делать, когда он изначально запускается.

Если это так, то это общая проблема с шаблоном, который вы используете, когда вы фактически хотите блокировать поток целых чисел, пока выВы получили начальное значение из широковещательного потока.

В настоящее время общим решением является буферизация целочисленного потока в вашем операторе (используя состояние), пока вы не получите это начальное значение, но это может привести к неограниченному состоянию в зависимости ото том, как быстро приходят целые числа и как долго вы должны ждать.

Еще можно попробовать обернуть ваш целочисленный источник (сделать его делегатом) и не выдавать никаких значений, пока вы не узнаете, чточто-то было передано.Например, сделать то, что транслируется в запрашиваемое состояние, и выполнять периодическую проверку, пока состояние не существует.

...