Кафка стрим: запись и агрегирование - PullRequest
0 голосов
/ 04 мая 2018
[
    {
        "device_nm": "x1",
        "type": "external",
        "mtrc1": 100,
        "mtrc2": 25,
        "starttime": "2018-05-04 01:00:00",
        "model": "t20"
    },
    {
        "device_nm": "x1",
        "type": "external",
        "mtrc1": 5,
        "mtrc2": 11,
        "starttime": "2018-05-04 02:00:00",
        "model": "t20"
    },
    {
        "device_nm": "x1",
        "type": "internal",
        "mtrc1": 35,
        "mtrc2": 15,
        "starttime": "2018-05-04 01:00:00",
        "model": "t40"
    },
    {
        "device_nm": "x1",
        "type": "internal",
        "mtrc1": 53,
        "mtrc2": 22,
        "starttime": "2018-05-04 02:00:00",
        "model": "t40"
    }
]

Предполагая, что каждое сообщение Kafka содержит набор объектов JSON, он хотел использовать KStream / KTable для выполнения группирования по device_nm, type, truncate (starttime) и агрегата mtrc1, mtrc2. Вывод должен быть таким, как показано ниже:

[
    {
        "device_nm": "x1",
        "type": "external",
        "mtrc1": 105,
        "mtrc2": 36,
        "date": "2018-05-04",
        "model": "t20"
    },
    {
        "device_nm": "x1",
        "type": "internal",
        "mtrc1": 88,
        "mtrc2": 37,
        "date": "2018-05-04",
        "model": "t40"
    }
]

Как мы можем использовать совокупный API, сохраняя при этом все атрибуты?

1 Ответ

0 голосов
/ 04 мая 2018

KafkaStream groupby, база агрегации по ключу, поэтому вы должны были сделать ключ с несколькими ключами

public class MKey {
    String device;
    String type;
}

public class MBody {
    int mtric1;
    int mricc2;
}

KStream<MKey, MBody> stream =
        sb.stream("topic",
                  Consumed.with(new JsonSerde<>(MKey.class), new JsonSerde<>(MBody.class, objectMapper)));
    stream.groupByKey()
          .aggregate(MBody::new,
                     (key, value, aggr) -> {
                         aggr.mricc2 += value.mricc2;
                         aggr.mtric1 += value.mtric1;
                         return aggr;
                     });
...