Простым и правильным подходом было бы использовать Count.<T>perElement()
, например:
Pipeline p = ...;
PCollection<T> elements = p.apply(...); // read elements
PCollection<KV<T, Long>> elementsCounts =
elements.apply(Count.<T>perElement());
PCollection<TableRow> results = elementsCounts.apply(ParDo.of(
new FormatOutputFn()));
Хотя, верно, для этого вам понадобится детерминированный кодер c элементов. Поэтому, если это не так (как я понимаю из того, что вы сказали выше), вам нужно добавить шаг перед Count
, чтобы преобразовать элемент в другое представление, где можно будет иметь детерминированный c кодер (например, AvroCoder
, например).
Если это невозможно по каким-либо причинам, то другим обходным решением может быть вычисление uniq ha sh для каждого элемента (но значение ha sh должно быть определено c как хорошо), создайте KV
для каждого элемента с новым ha sh как Key
и элементом как Value
и используйте GroupByKey
ниже по течению, чтобы получить сгруппированный кортеж с одинаковыми значениями.
Также обратите внимание, что, поскольку PubSub
является неограниченным источником, вам необходимо «ограничить» свой ввод любым типом стратегии Windows
(кроме Global
one), поскольку вся ваша группа / объединяется операции должны производиться внутри окна. Взгляните на WindowedWordCount в качестве примера решения аналогичной проблемы.