Похоже, что поток Кафки с окном сеанса с льготным периодом и подавлением не может вывести окончательное событие, если нет постоянного потока входных записей.
Контекст: Мы используем сбор данных изменений (CD C) для мониторинга изменений в устаревшей базе данных. Когда пользователь вносит изменения с помощью пользовательского интерфейса, транзакция базы данных изменит таблицы 1..n. Каждый оператор SQL приводит к записи Кафки. Они должны быть агрегированы, чтобы создать одну «триггерную запись», которая используется для запуска дорогостоящего процесса. Процесс должен быть запущен в течение секунды после транзакции в унаследованной базе данных. Только несколько пользователей работают со старым приложением, и поэтому между транзакциями может быть значительное количество времени.
У нас есть приложение Kafka Stream, которое использует окно сеанса и интервал бездействия в 400 мс для агрегировать входящие записи с одним и тем же ключом (идентификатором транзакции) и выводить запись триггера.
У нас есть рабочее решение, но запись триггера записывается только в выходные данные topi c, пока другие транзакции выполняются для создания постоянного потока входящих записей. Нам нужно закрыть окно и записать запись триггера, даже если дальнейших входных записей нет.
Рабочий код здесь: https://github.com/maxant/kafka-data-consistency/blob/714a44689fd48aa28b05b855638ac7ed50dd5ee9/partners/src/main/java/ch/maxant/kdc/partners/ThroughputTestStream.java#L65
Вот сводка этого кода:
stream.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofMillis(400)).grace(Duration.ofMillis(0)))
.aggregate(...)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream((k,v) -> k.key())
.to("throughput-test-aggregated");
Изначально у меня не было подавления и льготного периода. Используя только конфигурацию по умолчанию, я всегда получал заключительное событие для окна, содержащего все агрегированные записи, но оно занимало до 6 секунд после окна 400 мс, что слишком долго, чтобы ждать.
В Чтобы уменьшить задержку и ускорить процесс, я установил CACHE_MAX_BYTES_BUFFERING_CONFIG на 1, но это вызвало выходную запись после каждой агрегации, а не только одну выходную запись.
Я ввел подавление (и вместе с ним льготный период 0 мс), чтобы обеспечить создание только одной выходной записи.
Теперь проблема заключается в том, что я получаю только выходную запись, если новые входные записи поступают после закрытия окна (независимо от того, их ключа).
Тест создает 10 входных записей, все с одним и тем же ключом, с интервалом 10 мс, все в пределах 100 мс. Затем он отдыхает в течение 3 секунд, что позволяет мне отключить его после одной группы из десяти записей. Я ожидаю получить одну выходную запись, но ни одна не поступит, если я не оставлю тестовый запуск, чтобы создать вторую группу входных записей. Эта проблема воспроизводима.
Я прочитал следующие статьи, но не могу найти ничего, что описывает то, что я вижу, а именно, что окончательная запись отправляется на выход topi c только после дополнительных записей (независимо от ключа ) обрабатываются.
Что мне нужно изменить, чтобы окончательная запись отправлялась в мой вывод topi c, даже если дальнейшие записи не обрабатываются?
(Использование Kafka 2.4.1 с клиентом и сервером на Linux)