Как сгруппировать записи из одного раздела, используя окно для времени T, в одну запись, каждая запись имеет одинаковый ключ - PullRequest
0 голосов
/ 14 марта 2019

Допустим, у меня есть поток событий.

R1 - {"abc": "значение 1"}

R2 - {"abc": "значение 2"}

R3 - {"abc": "значение 3"}

R4 - {"abc": "значение 4"}

в одном разделе. Я хочу, чтобы производный поток событий из вышеуказанного потока имел событие, подобное

{"abc": ["значение 1", "значение 2", "значение 3", "значение 4"]}

с учетом каждой записи с тем же ключом уже доступно в теме.

Как я могу это сделать с использованием агрегата и groupByKey в Kafka Stream API?

1 Ответ

0 голосов
/ 14 марта 2019

Вот пример потока событий JSON, вы можете попробовать что-то вроде следующего:

KTable<Windowed<String>, JsonNode> timeWindowedAggregatedStream = stream.groupByKey().windowedBy(Duration.ofMinutes(5))
    .aggregate(
        () -> objectMapper::createObjectNode, /* initializer */
        (aggKey, newValue, aggValue) -> {
            final JsonNode element = value.has(fieldName) && value.get(fieldName) != null ? value.get(fieldName) : null;

        final ArrayNode arrayNode = aggregate == null || aggregate.get(fieldName) != null
                ? (ArrayNode) aggregate.get(fieldName)
                : mapper.createArrayNode();

        arrayNode.add(element);
        // TO remove duplicates
        Stream<Object> elementStream = IntStream.range(0, arrayNode.size()).mapToObj(arrayNode::get);
        Set<Object> arrayAsSet = elementStream.collect(Collectors.toSet());
        ObjectNode aggregateNode = mapper.createObjectNode();
        ArrayNode uniqueArrayNode = mapper.valueToTree(arrayAsSet);
        aggregate.set(fieldName, uniqueArrayNode); 
        return aggregate;
} , /* adder */
        Materialized.<String, JsonNode, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store") /* state store name */
        .withValueSerde(jsonNodeSerde)); /* serde for aggregate value */
...