Подавлять события запуска только при получении новых событий в потоке - PullRequest
0 голосов
/ 26 октября 2019

Я использую потоки Кафки 2.2.1.

Я использую функцию подавления для удержания событий до закрытия окна. Я использую семантику времени события. Однако инициированные сообщения запускаются только после того, как в потоке появляется новое сообщение.

Следующий пример кода извлекается для выявления проблемы:

        KStream<UUID, String>[] branches = is
            .branch((key, msg) -> "a".equalsIgnoreCase(msg.split(",")[1]),
                    (key, msg) -> "b".equalsIgnoreCase(msg.split(",")[1]),
                    (key, value) -> true);

    KStream<UUID, String> sideA = branches[0];
    KStream<UUID, String> sideB = branches[1];

    KStream<Windowed<UUID>, String> sideASuppressed =
            sideA.groupByKey(
                    Grouped.with(new MyUUIDSerde(),
                    Serdes.String()))
            .windowedBy(TimeWindows.of(Duration.ofMinutes(31)).grace(Duration.ofMinutes(32)))
            .reduce((v1, v2) -> {
                return v1;
            })
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
            .toStream();

Сообщения передаются только от 'sideASuppressed'когда новое сообщение попадает в поток' sideA '(сообщения, поступающие в' sideB ', не будут вызывать подавления для вывода каких-либо сообщений, даже если время закрытия окна прошло давно). Хотя в производственной среде проблема, скорее всего, не возникает из-за большого объема, есть достаточно случаев, когда важно не ждать нового сообщения, которое попадает в поток 'sideA'.

Заранее спасибо.

1 Ответ

0 голосов
/ 28 октября 2019

Согласно документации по потокам Kafka:

Время потока увеличивается, только если во всех входных разделах по всем входным темам доступны новые данные (с новыми временными метками). Если хотя бы в одном разделе нет новых доступных данных, время потока не будет увеличено и, следовательно, пунктуация () не будет запущена, если было указано PunctuationType.STREAM_TIME. Это поведение не зависит от настроенного экстрактора меток времени, т. Е. Использование WallclockTimestampExtractor не позволяет запускать функцию punctuate () на настенных часах.

Я не уверен, почему это так, но он объясняет, почемуподавленные сообщения отправляются только тогда, когда сообщения доступны в очереди, которую он использует.

Если у кого-то есть ответ относительно того, почему реализация такова, я буду рад узнать. Такое поведение заставляет мою реализацию отправлять сообщения только для того, чтобы мое подавленное сообщение излучало во времени, и делает код намного менее читабельным.

...