Комплексная агрегация - PullRequest
1 голос
/ 17 июня 2019

У меня есть данные в теме, которые необходимо подсчитать на нескольких уровнях, и во всех кодах и статьях упоминается только пример подсчета слов.

Примером данных будет:

серийный номер: 123 страна: сша Дата: 05.05.2008 штат: нью йорк город: нью-йорк Посетители: 5

серийный номер: 123 страна: сша Дата: 01/06/2018 штат: нью йорк город: королевы посетители: 10

серийный номер: 456 Дата: 01/06/2018 страна: сша штат: нью йорк город: королевы Посетители: 27

серийный номер: 123 Дата: 01/06/2018 страна: сша штат: нью йорк город: нью-йорк посетители: 867

Я сделал фильтр, groupBy, но агрегат? Извините за Java 8 и & mix, я предпочитаю 8, но изучаю его одновременно

KTable<String, CountryVisitorModel> countryStream1 = inStream
    .filter((key, value) -> value.status.equalsIgnoreCase("TEST_DATA"))
    .groupBy((key, value) -> value.serial)
    .aggregate(
            new Initializer<CountryVisitorModel>() {

            public CountryVisitorModelapply() {
                return new CountryVisitorModel();
            }
        },
        new Aggregator<String, InputModel, CountryVisitorModel>() {

            @Override
            public CountryVisitorModelapply(String key, InputModel value, CountryVisitorModel aggregate) {

    aggregate.serial = value.serial;
    aggregate.country_name = value.country_name;
    aggregate.city_name = value.city_name;

    aggregate.country_count++;
    aggregate.city_count++;
    aggregate.ip_count++;

        //
    return aggregate;
       }
},
Materialized.with(stringSerde, visitorSerde));

Для всех равных serial_id (это будет группа по) посчитать общее количество посетителей за это:

серийный страна штат город total_num_visitors

1 Ответ

1 голос
/ 17 июня 2019

Если каждая запись вносит вклад ровно в один счет, я бы порекомендовал branch() поток и счетчик для каждого подпотока:

KStream stream = builder.stream(...)
KStream[] subStreams = stream.branch(...);

// each record of `stream` will be contained in exactly _one_ `substream`
subStream[0].grouByKey().count(); // or aggregate() instead of count()
subStream[1].grouByKey().count();
// ...

Если ветвление не работает, потому что нужно перейти на одну записьпо нескольким подсчетам вы можете «транслировать» и фильтровать:

KStream stream = builder.stream(...)

// each record in `stream` will be "duplicated" and sent to all `filters`
stream.filter(...).grouByKey().count(); // or aggregate() instead of count()
stream.filter(...).grouByKey().count();
// ...

Использование одного и того же объекта KStream несколько раз и применение нескольких операторов (в нашем случае filter(), каждая запись будет «транслироваться» навсе операторы).Обратите внимание, что запись (т. Е. Объекты) физически не копируется для этого случая, но один и тот же объект входной записи используется для вызова каждого filter().

...