Kafka Streams DSL Cache - обработка надгробий - PullRequest
1 голос
/ 10 июля 2020

Мне нужно использовать кеш DSL Kafka Streams, чтобы уменьшить объем записи для последующих процессоров. Однако наше приложение обрабатывает надгробные плиты, что создает сложности. Например, учитывая следующие записи для одного ключа, K1:

<K1, V1>
<K1, V2>
<K1, V3>

Кэш DSL может выдавать только последнюю запись:

<K1, V3>

С включенным кешем DSL off, конечно, он выдаст все промежуточные записи:

<K1, V1>
<K1, V2>
<K1, V3>

Пока все работает, как ожидалось. Но с надгробными плитами необработанная последовательность становится:

<K1, V1>
<K1, V2>
<K1, V3>
<K1, NULL>

Таким образом, в зависимости от того, когда кэш очищен, мы можем никогда не увидеть окончательный счетчик. например,

<K1, V1>       | cached
<K1, V2>       | flushed
<K1, V3>       | cached
<K1, NULL>     | deleted

будет означать, что <K1, V2> сброшен, но никогда <K1, V3>. Семантика, которую я пытаюсь достичь, включает сброс последней записи для данного ключа в кеш всякий раз, когда для этого ключа получено захоронение.

<K1, V1>       | cached
<K1, V2>       | flushed
<K1, V3>       | cached
<K1, NULL>     | emit the latest record (`<K1, V3>`), then delete.

Мне не удалось сделать это с помощью DSL, и Processor API не раскрывает базовый кеш, поэтому и там не может. Я думаю о реализации настраиваемого кеша в памяти и использовании его с API процессора, но это усложняется, потому что кажется, что при некорректном завершении работы приложения может произойти потеря данных (например, SIGKILL). Не уверен, как кеш DSL обрабатывает некорректное завершение работы (например, может быть потеря данных), поэтому, возможно, реализация, о которой я думаю, может быть смоделирована после кеша DSL.

В любом случае, я не задумываюсь над этой проблемой? Есть ли способ передать sh последнюю запись из кеша DSL при получении надгробия вместо реализации настраиваемого кеша?

1 Ответ

0 голосов
/ 11 июля 2020

мы, возможно, никогда не увидим окончательный счет

Я понимаю, что вы говорите, однако в этом случае "последней" записью является надгробие, так что вы видите последнюю. . Вам нужен конкретный c промежуточный результат. DSL не позволяет делать это с такой мелкозернистой конфигурацией.

API процессора не раскрывает базовый кеш

Ну, это так. Набрав Stores.keyValueStoreBuilder(), вы можете позвонить withCachingEnabled() на возвращенный StoreBuilder. Обратите внимание, что в этом случае по умолчанию нижестоящие записи не отправляются, и вам необходимо реализовать logi c отправки вручную. Ie, вы не знаете, когда очищается кеш, и если он сбрасывается, он сбрасывается только на локальный диск и журнал изменений c, но не данные отправляются вниз по течению на flu sh.

Вы можете зарегистрировать знаки препинания для передачи данных через регулярный интервал времени. Кроме того, каждый раз, когда вы обрабатываете надгробную плиту, вы можете выдать текущее сохраненное значение из хранилища, прежде чем выполнять удаление в хранилище.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...