Обработка «обновления состояния» в Flink ConnectedStream - PullRequest
0 голосов
/ 28 января 2019

Мы создаем приложение, которое имеет два потока:

  1. Поток сообщений большого объема
  2. Большой статический поток (происходящий из некоторых паркетных файлов, которые у нас лежат)который мы вводим во Flink, чтобы перевести этот набор данных в сохраненное состояние

Мы хотим соединить два потока, чтобы получить общее состояние, чтобы 1-й поток мог использовать второе состояние для обогащения.

Каждый день или около того обновляются файлы паркета (источник 2-го потока), и для этого потребуется очистить состояние 2-го потока и перестроить его (вероятно, это займет около 2 минут).

Вопрос в том, можем ли мы блокировать / задерживать сообщения из 1-го потока во время работы этого процесса?

Спасибо.

Ответы [ 2 ]

0 голосов
/ 31 января 2019

Похоже, ваш случай похож на Flip-23 , в котором рассматривается обслуживание моделей в Apache Flink.

Я думаю, что все сводится к тому, как (и если) ваша статикаПоток имеет ключ:

  • Если он введен так же, как ваши быстрые данные, вы можете набрать оба потока, соединить их и получить доступ к контексту ввода.
  • если события статического потока не вводятся аналогичным образом, возможно, вам следует рассмотреть возможность генерирования управляющих событий, которые приведут к обновлению этих статических файлов из внешнего источника (например, s3).Это легче сказать, чем сделать, поскольку нет тривиального способа гарантировать, что все параллельные экземпляры вашего быстрого потока получат управляющее событие.Вы можете использовать ListState в качестве буфера, хотя доступ к нему зависит от формы ваших данных.

Это может помочь, если вы поделитесь немного большей информацией о форме ваших данных(Например, вы присоединяетесь к ключу? Вы просто служите модели? Другой?).

0 голосов
/ 30 января 2019

К сожалению, в настоящее время нет прямого / простого способа заблокировать один поток в другом.Типичное решение состоит в том, чтобы буферизовать поток загрузки, пока вы загружаете (или повторно загружаете) поток обогащения.

Один из подходов, который вы можете попробовать, - это обернуть ваш поток загрузки в пользовательский SourceFunction, который знает, когда не следуетгенерировать данные, основываясь на каком-то внешнем триггере (это тот же сигнал, который вы использовали бы, чтобы знать, что у вас есть данные Parquet для повторной загрузки).

...