Подавить агрегацию до нестандартного состояния - PullRequest
1 голос
/ 23 мая 2019

Я использую Kafka DSL.Как бы я продолжил подавлять вывод агрегации (поведение, подобное this ) с пользовательским условием?

Скажем, для каждого ключа у меня могут быть события START и STOP.Я хочу агрегировать этот ключ только при наступлении события STOP или по истечении времени ожидания.

Требуемый поток будет примерно таким:

time    input-topic                     output-topic
1       key1:{type:start, time: 0}    ...
3       key2:{type:start, time: 2}    ...
4       key1:{type:stop, time:3}      ...
4+e     ...                           key1:{type:closed, duration:3}
61      ...                           ...
61+e    ...                           key2:{type:timeout, duration:60}

, где время ожидания составляет 60 единиц времени.и e - произвольное время, необходимое потоку для обработки события.

Код (пока псевдокод) будет выглядеть примерно так:

KStream<String,String> sourceStream = builder.stream("input-topic", Consumed.with(stringSerializer, stringSerializer));
KGroupedStream<String, String> groupedStream = sourceStream
        .groupByKey();

KTable<String, String> aggregatedStream = groupedStream
        .suppress(Suppressed.untilWindowCloses(myCustomCondition()))
        .aggregate(
                () -> null,
                (aggKey, newValue, aggValue) -> aggregateStartStop(aggValue, newValue),
                Materialized
                        .<String, String, KeyValueStore<Bytes, byte[]>>as("aggregated-stream-store")
                        .withValueSerde(Serdes.String())
        );
aggregatedStream.toStream();

KafkaStreams streams = new KafkaStreams(builder.build(), streamsSettings);

streams.start();

1 Ответ

1 голос
/ 24 мая 2019

Вы можете использовать KTable для хранения состояния (в вашем случае, type) вместе с 60-секундным окном.Всякий раз, когда вы получаете событие для этого конкретного ключа, вы обновляете состояние и время.Затем вы можете использовать фильтр перед методом .to(), чтобы отправлять или не отправлять сообщение в исходящую тему в зависимости от состояния (type).

Взгляните на пост в блоге Нила Эйвери здесь: https://www.confluent.io/blog/journey-to-event-driven-part-4-four-pillars-of-event-streaming-microservices И прокрутите вниз до разбивки потока событий 1. Платежные потоки Это то, откуда я взял идею.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...