У меня есть данные в теме, которые необходимо подсчитать на нескольких уровнях, и во всех кодах и статьях упоминается только пример подсчета слов.
Примером данных будет:
серийный номер: 123
страна: сша
Дата: 05.05.2008
штат: нью йорк
город: нью-йорк
Посетители: 5
серийный номер: 123
страна: сша
Дата: 01/06/2018
штат: нью йорк
город: королевы
посетители: 10
серийный номер: 456
Дата: 01/06/2018
страна: сша
штат: нью йорк
город: королевы
Посетители: 27
серийный номер: 123
Дата: 01/06/2018
страна: сша
штат: нью йорк
город: нью-йорк
посетители: 867
Я сделал фильтр, groupBy, но агрегат?
Извините за Java 8 и & mix, я предпочитаю 8, но изучаю его одновременно
KTable<String, CountryVisitorModel> countryStream1 = inStream
.filter((key, value) -> value.status.equalsIgnoreCase("TEST_DATA"))
.groupBy((key, value) -> value.serial)
.aggregate(
new Initializer<CountryVisitorModel>() {
public CountryVisitorModelapply() {
return new CountryVisitorModel();
}
},
new Aggregator<String, InputModel, CountryVisitorModel>() {
@Override
public CountryVisitorModelapply(String key, InputModel value, CountryVisitorModel aggregate) {
aggregate.serial = value.serial;
aggregate.country_name = value.country_name;
aggregate.city_name = value.city_name;
aggregate.country_count++;
aggregate.city_count++;
aggregate.ip_count++;
//
return aggregate;
}
},
Materialized.with(stringSerde, visitorSerde));
Для всех равных serial_id (это будет группа по)
посчитать общее количество посетителей за это:
серийный страна штат город total_num_visitors