Присоединяйтесь к 2 неограниченным Pcollections на ключ - PullRequest
2 голосов
/ 15 апреля 2019

Я пытаюсь объединить две неограниченные PCollection, которые я получаю из двух разных тем кафки на основе ключа.

В соответствии с документами и другими блогами объединение может быть возможно только в том случае, если мы выполняем управление окнами. Окно собирает сообщения из обоих потоков в определенном окне и присоединяет его. Что не то, что мне нужно.

Ожидаемый результат - в одном потоке сообщения приходят с очень низкой частотой, а из другого потока мы получаем сообщения с высокой частотой. Я хочу, чтобы, если значение ключа не поступило в оба потока, мы не будем выполнять соединение до тех пор, пока оно не поступит, выполнить соединение. Возможно ли использовать текущую парадигму луча?

Ответы [ 2 ]

2 голосов
/ 16 апреля 2019

Короче говоря, лучшее решение - использовать DoFn с сохранением состояния в Beam. Вы можете иметь состояние для каждого ключа (и для каждого окна, которое в вашем случае является глобальным окном). Вы можете сохранить события одного потока в состоянии, и как только события из другого потока появятся с тем же ключом, присоедините его к событиям в состоянии. Вот ссылка [1].

Однако, короткий ответ не использует истинную силу модели Луча. Модель Beam предоставляет способы баланса между задержкой, стоимостью и точностью. Предоставляет простой API, чтобы скрыть сложную потоковую обработку.

Почему я это говорю? Давайте вернемся к решению краткого ответа: с состоянием DoFn. В подходе DoFn с учетом состояния вам не хватает способов ответить на следующие вопросы:

  • Что если вы буферизировали события 1M для одного ключа, и в другом потоке события по-прежнему не появлялись? Вам нужно опустошить государство? Что если событие появится сразу после опустошения штата?
  • Если в конце концов есть одно событие, которое, по-видимому, завершает JOIN, приемлема ли стоимость буферизации 1M событий для JOIN одного события из другого потока?
  • Как обработать позднюю дату в обоих потоках? Скажем, вы присоединились к <1, a> из левого потока <1, b> из правого потока. Позже есть еще один <1, c> из левого потока, откуда вы знаете, что вам нужно только испустить <1, <c, b>>, предположим, что это инкрементный режим для вывода результата. Если вы начинаете буферизовать те уже присоединенные события, чтобы получить дельту, это действительно становится слишком сложным для программиста.

Управление лучами, управление триггерами, уточнение выходных данных, управление водяными знаками и задержкой SLA предназначены для того, чтобы скрыть от вас следующие сложности:

  • водяной знак: указывает, когда окна завершены, так что события не будут продолжаться долго (и дальнейшие события будут рассматриваться как поздние данные)
  • Lateness SLA control: контролируйте время, когда вы кэшируете данные для объединения.
  • уточнение выходных данных: корректно обновлять вывод, если разрешены новые события.

Хотя модель Beam хорошо спроектирована. В реализации модели Beam отсутствуют важные функции для поддержки описанного вами объединения:

  • Оконное управление недостаточно гибкое, чтобы поддерживать ваш случай, когда потоки имеют огромные разные частоты (поэтому фиксированное и скользящее окно не подходит). Кроме того, вы также не знаете частоту поступления потоков (поэтому окно сеанса на самом деле не подходит, так как вы должны задать промежуток между окнами сеанса).
  • отсутствует ретракция, так что вы не можете уточнить вывод, как только появятся поздние события.

В заключение, модель Beam разработана для обработки сложных потоковых данных, которые идеально соответствуют вашим потребностям. Но реализация недостаточно хороша, чтобы позволить вам использовать ее сейчас, чтобы завершить ваш вариант использования соединения.

[1] https://beam.apache.org/blog/2017/02/13/stateful-processing.html

0 голосов
/ 16 апреля 2019

Сегодня это не очень хорошо поддерживается моделью Beam, но есть несколько способов сделать это. В этих примерах предполагается, что каждый ключ появляется в каждом потоке ровно один раз, если это не так, вам нужно настроить их.

Один из вариантов - использовать Глобальное окно и Stateful DoFn вместо соединения. Глобальное окно эффективно отключает управление окнами. DoFn с отслеживанием состояния позволяет хранить данные о ключе, который вы обрабатываете, в «ячейке состояния» для дальнейшего использования. Когда вы получаете запись, вы проверяете значение в ячейке состояния. Если вы найдете его, выполните объединение, введите значение и очистите состояние. Если ничего нет, сохраните текущее значение.

Другой вариант - использовать Windows Session и Join. Окно сеанса «GapDuration» фактически является тайм-аутом для данного ключа. Это работает до тех пор, пока у вас есть ограничение по времени, в течение которого вы увидите Ключ в обоих потоках. Вам также нужно настроить триггер счетчика элементов «AfterPane.elementCountAtLeast (2)», чтобы вам не приходилось ждать полного тайм-аута после просмотра второго фрагмента данных.

...