Моя идея такова:
Сначала groupByKey, затем ip, устройство уникально, затем map [ip, device] ip - это ключевое значение устройства.Снова GroupByKey, я думаю, что значение счетчика - это количество устройств, соответствующих ip.
Запись Кафки равна
значение ключа (ip, deviceId)
1 127.0.0.1,aa-bb-cc
2 127.0.0.1, aa-bb-cc
3 127.0.0.1, aa-bb-cc
..... (больше, Но все значения 127.0.0.1, aa-bb-cc)
Я хочу получить количество идентификаторов устройств, которыми владеет ip, во временном окне переключения.
код :
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> records = builder.stream(topic);
KStream<String, String> formatRecoed = records.map(new KeyValueMapper<String, String, KeyValue<String, String>>(){
@Override
public KeyValue<String, String> apply(String key, String value) {
return new KeyValue<>(value, key);
}
}
formatRecoed.groupByKey().windowedBy(TimeWindows.of(1000 * 60).advanceBy(1000 * 6).until(1000 * 60)).count().toStream(new KeyValueMapper<Windowed<String>, Long, String>(){
@Override
public String apply(Windowed<String> key, Long value) {
return key.toString();
}
}).map(new KeyValueMapper<String, Long, KeyValue<String, String>>(){
@Override
public KeyValue<String, String> apply(String key, Long value) {
String[] keys = key.split(",");
return new KeyValue<>(keys[0], keys[1]);
}
}).groupByKey().windowedBy(TimeWindows.of(1000 * 60).advanceBy(1000 * 6).until(1000 * 60)).count().toStream(new KeyValueMapper<Windowed<String>, Long, String>(){
@Override
public String apply(Windowed<String> key, Long value) {
return key.toString();
}
}).map(new KeyValueMapper<String, Long, KeyValue<String, String>>(){
@Override
public KeyValue<String, String> apply(String key, Long value) {
return new KeyValue<>(key, "" + value);
}
}).to("topic");
Ожидаемый результат. Каждое временное окно имеет значение
значение ключа
127.0.0.1@1543495068000/1543495188000 1
127.0.0.1@1543495074000/1543495194000 1
127.0.0.1@1543495080000/1543495200000 1
Но мой результат работы :
127.0.0.1@1543495068000/1543495188000 3
127.0.0.1@1543495074000/1543495194000 4
127.0.0.1@1543495080000/1543495200000 1
Почему это?
Я с нетерпением жду, когда кто-нибудь мне поможет.