Какие гарантии предоставляет Kafka Stream при использовании хранилища состояний RocksDb с журналом изменений? - PullRequest
0 голосов
/ 31 августа 2018

Я создаю приложение Kafka Streams, которое генерирует события изменений, сравнивая каждый новый вычисляемый объект с последним известным объектом.

Таким образом, для каждого сообщения во входной теме я обновляю объект в хранилище состояний и время от времени (с помощью пунктуации) применяю вычисление к этому объекту и сравниваю результат с предыдущим результатом вычисления (полученным из другой государственный магазин).

Чтобы убедиться, что эта операция согласована, я делаю следующее после точечных триггеров:

  1. написать кортеж в государственное хранилище
  2. сравнить два значения, создать события изменения и context.forward их. Так что события идут в тему результатов.
  3. поменяйте местами кортеж по new_value и запишите его в хранилище состояний

Я использую этот кортеж для сценариев, в которых происходит сбой или изменение баланса приложения, поэтому я всегда могу отправить правильный набор событий перед продолжением.

Теперь я заметил, что результирующие события не всегда согласованы, особенно если приложение часто перебалансируется. Похоже, что в редких случаях приложение Kafka Streams отправляет события в тему результатов, но тема журнала изменений еще не обновлена. Другими словами, я создал что-то для темы результатов, но моя тема журнала изменений еще не находится в том же состоянии.

Итак, когда я выполняю stateStore.put() и вызов метода успешно возвращается, есть ли гарантии, когда он будет в теме журнала изменений?

Могу ли я применить сброс журнала изменений? Когда я делаю context.commit(), когда произойдет этот сброс + коммит?

process flow

1 Ответ

0 голосов
/ 02 сентября 2018

Чтобы получить полную согласованность, вам нужно включить processing.guarantee="exaclty_once" - в противном случае, с потенциальной ошибкой, вы можете получить противоречивые результаты.

Если вы хотите остаться с «at_least_once», вы можете использовать один магазин и обновлять магазин после обработки (т.е. после вызова forward()). Это минимизировало временное окно, чтобы получить несоответствия.

И да, если вы вызываете context.commit(), перед тем как смещения входных тем будут зафиксированы, все хранилища будут сброшены на диск, а также все ожидающие записи производителя также будут сброшены.

...