Я создаю приложение Kafka Streams, которое генерирует события изменений, сравнивая каждый новый вычисляемый объект с последним известным объектом.
Таким образом, для каждого сообщения во входной теме я обновляю объект в хранилище состояний и время от времени (с помощью пунктуации) применяю вычисление к этому объекту и сравниваю результат с предыдущим результатом вычисления (полученным из другой государственный магазин).
Чтобы убедиться, что эта операция согласована, я делаю следующее после точечных триггеров:
- написать кортеж в государственное хранилище
- сравнить два значения, создать события изменения и
context.forward
их. Так что события идут в тему результатов.
- поменяйте местами кортеж по new_value и запишите его в хранилище состояний
Я использую этот кортеж для сценариев, в которых происходит сбой или изменение баланса приложения, поэтому я всегда могу отправить правильный набор событий перед продолжением.
Теперь я заметил, что результирующие события не всегда согласованы, особенно если приложение часто перебалансируется. Похоже, что в редких случаях приложение Kafka Streams отправляет события в тему результатов, но тема журнала изменений еще не обновлена. Другими словами, я создал что-то для темы результатов, но моя тема журнала изменений еще не находится в том же состоянии.
Итак, когда я выполняю stateStore.put()
и вызов метода успешно возвращается, есть ли гарантии, когда он будет в теме журнала изменений?
Могу ли я применить сброс журнала изменений? Когда я делаю context.commit()
, когда произойдет этот сброс + коммит?