Мне нужно использовать кеш DSL Kafka Streams, чтобы уменьшить объем записи для последующих процессоров. Однако наше приложение обрабатывает надгробные плиты, что создает сложности. Например, учитывая следующие записи для одного ключа, K1
:
<K1, V1>
<K1, V2>
<K1, V3>
Кэш DSL может выдавать только последнюю запись:
<K1, V3>
С включенным кешем DSL off, конечно, он выдаст все промежуточные записи:
<K1, V1>
<K1, V2>
<K1, V3>
Пока все работает, как ожидалось. Но с надгробными плитами необработанная последовательность становится:
<K1, V1>
<K1, V2>
<K1, V3>
<K1, NULL>
Таким образом, в зависимости от того, когда кэш очищен, мы можем никогда не увидеть окончательный счетчик. например,
<K1, V1> | cached
<K1, V2> | flushed
<K1, V3> | cached
<K1, NULL> | deleted
будет означать, что <K1, V2>
сброшен, но никогда <K1, V3>
. Семантика, которую я пытаюсь достичь, включает сброс последней записи для данного ключа в кеш всякий раз, когда для этого ключа получено захоронение.
<K1, V1> | cached
<K1, V2> | flushed
<K1, V3> | cached
<K1, NULL> | emit the latest record (`<K1, V3>`), then delete.
Мне не удалось сделать это с помощью DSL, и Processor API не раскрывает базовый кеш, поэтому и там не может. Я думаю о реализации настраиваемого кеша в памяти и использовании его с API процессора, но это усложняется, потому что кажется, что при некорректном завершении работы приложения может произойти потеря данных (например, SIGKILL). Не уверен, как кеш DSL обрабатывает некорректное завершение работы (например, может быть потеря данных), поэтому, возможно, реализация, о которой я думаю, может быть смоделирована после кеша DSL.
В любом случае, я не задумываюсь над этой проблемой? Есть ли способ передать sh последнюю запись из кеша DSL при получении надгробия вместо реализации настраиваемого кеша?