Kafka SessionWindow с подавлением отправляет конечное событие только при наличии постоянного потока входных записей - PullRequest
2 голосов
/ 24 марта 2020

Похоже, что поток Кафки с окном сеанса с льготным периодом и подавлением не может вывести окончательное событие, если нет постоянного потока входных записей.

Контекст: Мы используем сбор данных изменений (CD C) для мониторинга изменений в устаревшей базе данных. Когда пользователь вносит изменения с помощью пользовательского интерфейса, транзакция базы данных изменит таблицы 1..n. Каждый оператор SQL приводит к записи Кафки. Они должны быть агрегированы, чтобы создать одну «триггерную запись», которая используется для запуска дорогостоящего процесса. Процесс должен быть запущен в течение секунды после транзакции в унаследованной базе данных. Только несколько пользователей работают со старым приложением, и поэтому между транзакциями может быть значительное количество времени.

У нас есть приложение Kafka Stream, которое использует окно сеанса и интервал бездействия в 400 мс для агрегировать входящие записи с одним и тем же ключом (идентификатором транзакции) и выводить запись триггера.

У нас есть рабочее решение, но запись триггера записывается только в выходные данные topi c, пока другие транзакции выполняются для создания постоянного потока входящих записей. Нам нужно закрыть окно и записать запись триггера, даже если дальнейших входных записей нет.

Рабочий код здесь: https://github.com/maxant/kafka-data-consistency/blob/714a44689fd48aa28b05b855638ac7ed50dd5ee9/partners/src/main/java/ch/maxant/kdc/partners/ThroughputTestStream.java#L65

Вот сводка этого кода:

      stream.groupByKey()
            .windowedBy(SessionWindows.with(Duration.ofMillis(400)).grace(Duration.ofMillis(0)))
            .aggregate(...)
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
            .toStream((k,v) -> k.key())
            .to("throughput-test-aggregated");

Изначально у меня не было подавления и льготного периода. Используя только конфигурацию по умолчанию, я всегда получал заключительное событие для окна, содержащего все агрегированные записи, но оно занимало до 6 секунд после окна 400 мс, что слишком долго, чтобы ждать.

В Чтобы уменьшить задержку и ускорить процесс, я установил CACHE_MAX_BYTES_BUFFERING_CONFIG на 1, но это вызвало выходную запись после каждой агрегации, а не только одну выходную запись.

Я ввел подавление (и вместе с ним льготный период 0 мс), чтобы обеспечить создание только одной выходной записи.

Теперь проблема заключается в том, что я получаю только выходную запись, если новые входные записи поступают после закрытия окна (независимо от того, их ключа).

Тест создает 10 входных записей, все с одним и тем же ключом, с интервалом 10 мс, все в пределах 100 мс. Затем он отдыхает в течение 3 секунд, что позволяет мне отключить его после одной группы из десяти записей. Я ожидаю получить одну выходную запись, но ни одна не поступит, если я не оставлю тестовый запуск, чтобы создать вторую группу входных записей. Эта проблема воспроизводима.

Я прочитал следующие статьи, но не могу найти ничего, что описывает то, что я вижу, а именно, что окончательная запись отправляется на выход topi c только после дополнительных записей (независимо от ключа ) обрабатываются.

Что мне нужно изменить, чтобы окончательная запись отправлялась в мой вывод topi c, даже если дальнейшие записи не обрабатываются?

(Использование Kafka 2.4.1 с клиентом и сервером на Linux)

1 Ответ

2 голосов
/ 24 марта 2020

Обновление : у меня ошибка в топологии, исправлена ​​

У меня были те же проблемы, что и у вас при использовании подавления, и это ожидаемое поведение. Поскольку подавление поддерживает только отправку буферизованных записей с использованием времени потока, а не времени настенных часов, если вы прекратите получать новые записи, время потока будет остановлено и Suppress не будет выдавать последнее подавленное окно.

Решение, которое я использовал состоит в том, чтобы написать пользовательское подавление с помощью Processor API (используйте Transfomer, чтобы вы могли использовать DSL для отправки нисходящей записи) с хранилищем состояний, используемым в качестве буфера, а затем проверьте, что windows должно быть flu sh (или emit) для нисходящий процессор всякий раз, когда поступает новая запись или по истечении временного интервала (с использованием WALL_CLOCK_TIME пунктуации).

Трансформатор будет выглядеть следующим образом:

public class SuppressWindowTransformer implements Transformer<Windowed<String>, String, KeyValue<Windowed<String>, String>> {
    private ProcessorContext context;
    private Cancellable cancellable;
    private KeyValueStore<Windowed<String>, String> kvStore;
    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        kvStore = (KeyValueStore) context.getStateStore("suppressed_windowed_transaction");
        cancellable = context.schedule(Duration.ofMillis(100), PunctuationType.WALL_CLOCK_TIME, timestamp -> flushOldWindow());
    }

    @Override
    public KeyValue<Windowed<String>, String> transform(Windowed<String> key, String value) {
        kvStore.put(key, value);//buffer (or suppress) the new in coming window
        flushOldWindow();
        return null;
    }

    private void flushOldWindow() {
        //your logic to check for old windows in kvStore then flush

        //forward (or unsuppressed) your suppressed records downstream using ProcessorContext.forward(key, value)
    }

    @Override
    public void close() {
        cancellable.cancel();//cancel punctuate
    }
}

И в ваш поток DSL:

stream.groupByKey()
            .windowedBy(SessionWindows.with(Duration.ofMillis(400)).grace(Duration.ofMillis(0)))
            .aggregate(...)//remove suppress operator and write custom suppress using processor API
            .toStream()
            .transform(SuppressWindowTransformer::new, "suppressed_windowed_transaction")
            .to("throughput-test-aggregated");
...