Как добиться аналогичного sql: выберите a, count (отличный (b)) из группы x с помощью потока kafka - PullRequest
0 голосов
/ 29 ноября 2018

Моя идея такова:

Сначала groupByKey, затем ip, устройство уникально, затем map [ip, device] ip - это ключевое значение устройства.Снова GroupByKey, я думаю, что значение счетчика - это количество устройств, соответствующих ip.

Запись Кафки равна

значение ключа (ip, deviceId)

1 127.0.0.1,aa-bb-cc

2 127.0.0.1, aa-bb-cc

3 127.0.0.1, aa-bb-cc

..... (больше, Но все значения 127.0.0.1, aa-bb-cc)

Я хочу получить количество идентификаторов устройств, которыми владеет ip, во временном окне переключения.

код :

final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> records = builder.stream(topic);
KStream<String, String> formatRecoed = records.map(new KeyValueMapper<String, String, KeyValue<String, String>>(){

        @Override
        public KeyValue<String, String> apply(String key, String value) {

              return new KeyValue<>(value, key);
         }
}
formatRecoed.groupByKey().windowedBy(TimeWindows.of(1000 * 60).advanceBy(1000 * 6).until(1000 * 60)).count().toStream(new KeyValueMapper<Windowed<String>, Long, String>(){
        @Override
        public String apply(Windowed<String> key, Long value) {
            return key.toString();
        }

    }).map(new KeyValueMapper<String, Long, KeyValue<String, String>>(){

        @Override
        public KeyValue<String, String> apply(String key, Long value) {
            String[] keys = key.split(",");
            return new KeyValue<>(keys[0], keys[1]);
        }
    }).groupByKey().windowedBy(TimeWindows.of(1000 * 60).advanceBy(1000 * 6).until(1000 * 60)).count().toStream(new KeyValueMapper<Windowed<String>, Long, String>(){
        @Override
        public String apply(Windowed<String> key, Long value) {
            return key.toString();
        }

    }).map(new KeyValueMapper<String, Long, KeyValue<String, String>>(){

        @Override
        public KeyValue<String, String> apply(String key, Long value) {
            return new KeyValue<>(key, "" + value);
        }
    }).to("topic");

Ожидаемый результат. Каждое временное окно имеет значение

значение ключа

127.0.0.1@1543495068000/1543495188000 1

127.0.0.1@1543495074000/1543495194000 1

127.0.0.1@1543495080000/1543495200000 1

Но мой результат работы :

127.0.0.1@1543495068000/1543495188000 3

127.0.0.1@1543495074000/1543495194000 4

127.0.0.1@1543495080000/1543495200000 1

Почему это?

Я с нетерпением жду, когда кто-нибудь мне поможет.

1 Ответ

0 голосов
/ 29 ноября 2018

В вашем коде есть два окна, и это может быть причиной проблем.Я бы предложил этот поток:

records.map((key, value) -> {
      String[] data = value.split(",");
      return KeyValue.pair(data[0], data[1]);
    })
    .groupByKey() // by IP
    .windowedBy(TimeWindows.of(1000 * 60).advanceBy(1000 * 6).until(1000 * 60))
    .reduce((device1, device2) -> device1 + "|" + device2)
    .toStream() // stream of lists of devices per IP in window
    .mapValues(devices -> new HashSet<>(Arrays.asList(devices.split("|"))) // set of devices
    .mapValues(set -> set.size().toString())

Результирующий KStream - это оконный поток (IP, count(distinct(devices))) (обе строки), так что вы можете переслать его в другую тему.В этом методе предполагается, что в именах устройств отсутствует один символ (|), если его нет, вам нужно изменить метод сериализации.

...