Apache Beam Combine vs GroupByKey - PullRequest
       154

Apache Beam Combine vs GroupByKey

0 голосов
/ 09 мая 2020


Итак, я столкнулся с этой, кажется, классической c -проблемой, извлеките временные вершины для неограниченного потока,
используя Apache Beam (Flink в качестве движка):

Предполагается, что сайты + вводимые кортежи совпадений:
{"aaa.com", 1001}, {"bbb.com", 21}, {"aaa.com", 1002}, {"ccc .com ", 3001}, {" bbb.com ", 22} ....
(Ожидаемая скорость: + 100K записей в час)

Цель состоит в том, чтобы выводить сайты, которые составляют> 1% от общего числа попаданий в каждый 1 час.
т.е. для 1-часового окна исправлений выберите сайт, который в сумме составляет> 1% от общего числа совпадений.

Итак, сначала, сумма по ключу:
{"aaa.com", 2003}, {"bbb.com", 43}, {"ccc .com", 3001} ....
И, наконец, выведите> 1%:
{"aaa.com"}, {"ccc .com"}

Альтернатива:

1) Group + parDo :
Фиксированное временное окно 1 час, сгруппировать все элементы, следуя повторяемому parDo для всех элементов окна,
вычислить сумму и вывести сайты> 1%.
Кажется, против быть все агро ess выполняется в одном потоке и, похоже, требует двойных итераций, чтобы получить сумму и получить> 1%.

2) GroupByKey + Combine
Фиксированное временное окно 1 час, GrouByKey с использованием ключа = Сайт, применяющий Объединение с настраиваемым аккумулятором для суммирования совпадений для каждой клавиши.

Хотя вариант «Объединить» (# 2) кажется более подходящим,
мне не хватает части , попадающей в сумму за -1-часовое окно, , необходимое для вычисления элементов>% 1:
Можно ли использовать одно и то же окно для двух комбинаций: одно для каждой клавиши и одна общая сумма совпадений в этом окне?
и как лучше всего их объединить, чтобы сделать вызов> 1% на элемент?

10x

1 Ответ

0 голосов
/ 11 мая 2020

Вы можете сделать это через боковые входы. Например, вы бы сделали что-то вроде этого (код в Python, но ответ для Java аналогичен):


input_data = .... # ("aaa.com", 1001), ("bbb.com", 21), ("aaa.com", 1002), ("ccc.com", 3001), ("bbb.com", 22) ....

total_per_key = input_data | beam.CombinePerKey(sum)

global_sum_per_window = beam.pvalue.AsSingleton(
    input_data
    | beam.Values()
    | beam.CombineGlobally(sum).without_defaults())

def find_more_than_1pct(elem, global_sum):
  key, value = elem
  if value > global_sum * 0.01:
    yield elem

over_1_pct_keys = total_per_key | beam.FlatMap(find_more_than_1pct)

В этом случае global_sum_per_window PCollection будет иметь одно значение для каждое окно, а total_per_key будет иметь одно значение для каждой клавиши на окно.

Сообщите мне, если это сработает!

...