Apache Количество лучей уникальных элементов - PullRequest
0 голосов
/ 29 мая 2020

У меня есть задание Apache Beam, которое загружает данные из PubSub, а затем загружает их в BigQuery, я преобразовываю сообщение PubSub в pojo с полями

id, name, count

Count означает количество неуникальных элементов в одной загрузке.

Если я загружаю из PubSub 3 элемента, два из которых одинаковы, то мне нужно загрузить в BigQuery 2 элементы, один из них будет есть счетчик 2.

Интересно, как легко сделать это в Apache Beam. Я пытался сделать это через DoFn или MapElements, но там я могу обрабатывать только один элемент. Я также пытался преобразовать элемент в KV, а затем посчитать, но у меня есть кодер без детерменистики.

В обычном приложении java я могу просто использовать равно или через карту, но здесь, в Apache луче, все по-другому.

1 Ответ

0 голосов
/ 29 мая 2020

Простым и правильным подходом было бы использовать Count.<T>perElement(), например:

Pipeline p = ...;
PCollection<T> elements = p.apply(...); // read elements
PCollection<KV<T, Long>> elementsCounts =
    elements.apply(Count.<T>perElement());
PCollection<TableRow> results = elementsCounts.apply(ParDo.of(
    new FormatOutputFn()));

Хотя, верно, для этого вам понадобится детерминированный кодер c элементов. Поэтому, если это не так (как я понимаю из того, что вы сказали выше), вам нужно добавить шаг перед Count, чтобы преобразовать элемент в другое представление, где можно будет иметь детерминированный c кодер (например, AvroCoder , например).

Если это невозможно по каким-либо причинам, то другим обходным решением может быть вычисление uniq ha sh для каждого элемента (но значение ha sh должно быть определено c как хорошо), создайте KV для каждого элемента с новым ha sh как Key и элементом как Value и используйте GroupByKey ниже по течению, чтобы получить сгруппированный кортеж с одинаковыми значениями.

Также обратите внимание, что, поскольку PubSub является неограниченным источником, вам необходимо «ограничить» свой ввод любым типом стратегии Windows (кроме Global one), поскольку вся ваша группа / объединяется операции должны производиться внутри окна. Взгляните на WindowedWordCount в качестве примера решения аналогичной проблемы.

...