ПРЕДИСЛОВИЕ:
В нашей организации мы пытаемся использовать Kafka для решения проблемы, которая включает сбор изменений в базе данных Oracle и отправку через Kafka.На самом деле это CDC, для этого мы используем Kafka Connector.
Мы отслеживаем изменения в Oracle, используя запросы Oracle Flashback, это позволяет нам получить временную метку изменения и операции (Вставить, Удалить, Обновить).
После внесения некоторых измененийв таблице, которую мы наблюдаем, Kafka Connector публикует это в теме, мы далее читаем эту тему, используя Kafka Streams.
Проблема в том, что иногда в запросе Flashback появляются одинаковые строки из-за некоторого обновленияв таблице, которая ничего не изменила (это также вызывает изменение флэшбэка), или если в таблице 100 столбцов, а мы наблюдаем только 20, в итоге мы видим повторяющиеся строки в запросе, поскольку ни одно из этих 20 полей не имеетизменено.
Мы используем флэшбэк, чтобы получить измененные строки (включая исключенные).В соединителе мы устанавливаем метку времени + режим приращения (метка времени получается из поля version_starttime запроса флешбэка)
Важно: мы не можем больше касаться БД, я имею в виду, мыне может создавать триггеры вместо использования уже существующей схемы Flashback.
ВОПРОС
Мы пытаемся отфильтровать записи в Кафке, если они есть (ключ, значение)в равной степени по содержанию мы хотим отказаться.Обратите внимание, что это не единственная семантика.Запись будет повторяться с большой разницей по времени.
Если я использую KTable для проверки последнего значения некоторой записи, насколько эффективно это будет после длительного периода?
Я имею в виду внутреннее состояние хранения потребителей, обрабатывается RocksDB и защищенной темой Kafka, поскольку, если я использую неоконный KTable, это внутреннее пространство может оказаться очень большим.
Что считается хорошим подходом в этом сценарии?Не перегружать внутреннее хранилище состояния Kafka Consumers и в то же время иметь возможность узнать, была ли фактическая запись уже обработана время назад.