Kstreams группировки на двух полях, чтобы получить количество - PullRequest
0 голосов
/ 09 февраля 2019

Можем ли мы сгруппировать по двум полям (одно является ключом, а другое значение) и получить количество в kstreams.

Я хочу получить различное количество идентификаторов пользователя (значения) для каждого pid (ключа) .groupByKey не будетдать отличный идентификатор пользователя.Я пытался использовать groupBy вместо groupByKey, но видел синтаксические ошибки.Может кто-нибудь помочь?

   KStream<Integer, Integer> stream = events.map((key, value) -> new KeyValue<Integer, Integer>(value.getpid(), value.getUserId()));

   KGroupedStream<Integer, Integer> groupedStream = stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.Integer());

Ответы [ 2 ]

0 голосов
/ 09 февраля 2019

Если вы хотите считать по идентификатору пользователя и pid, вы можете поместить оба ключа в виде Pojo в ключ:

KStream<UserPid, Integer> stream =
    events.selectKey((key, value) -> new UserPid(value.getpid(), value.getUserId()));
KGroupedStream<Integer, Integer> groupedStream =
    stream.groupByKey(Grouped.with(new UserPidSerde(), Serdes.Integer());

Вам необходимо создать соответствующий класс POJO UserPid и класс serde ** 1005.

0 голосов
/ 09 февраля 2019

Поскольку для каждого pid (ключа) требуется различное количество пользователей (значений), сначала необходимо использовать groupByKey, который сгруппирует все users с одинаковыми pid.Затем вам нужно объединиться в форму set из user (чтобы получить уникальных пользователей).После этого просто получите размер set и вы получите количество отдельных пользователей для каждого pid.

KStream<Integer, Integer> stream = events.map((key, value) -> new KeyValue<Integer, Integer>(value.getpid(), value.getUserId()));
KStream<Integer, Integer> output = stream.groupByKey().
            aggregate((Initializer<Set<Integer>>) HashSet::new,
                    (k, v, current) -> {current.add(v); return current;}).mapValues(Set::size).toStream();
...