Kafka Streams: есть ли какие-либо гарантии упорядочения сохранений в хранилища состояний при использовании at_least_once? - PullRequest
0 голосов
/ 17 января 2019

У нас есть топология Java Kafka Streams, построенная с использованием Processor API.

В топологии у нас есть один процессор, который сохраняет в несколько хранилищ состояний.

Поскольку мы используем at_least_once, мы ожидаем увидеть некоторые несоответствия между государственными хранилищами - например, входящая запись приводит к записи как в хранилище состояний A, так и в B, но сбой между сохранениями приводит к тому, что только сохранение в хранилище A записывается в раздел журнала изменений Kafka.

  1. Гарантируем ли мы, что порядок, в котором мы сохраняем, также будет порядком, в котором происходит запись в хранилища состояния? Например. если мы сначала сохраним в хранилище A, а затем в хранилище B, мы, конечно, можем иметь ситуацию, когда запись в оба журнала изменений была успешной, и ситуацию, когда была завершена только запись в журнал изменений A, - но мы можем также оказаться в ситуация, когда только запись в журнал изменений B была завершена?

  2. Какие ситуации приведут к повторам? Конечно, сбой - но как насчет перебалансировки, нового лидера раздела брокера, или когда мы получим ошибку «Смещение фиксации не выполнено» (истекло время ожидания запроса)?

  3. Некоторое время назад мы пытались использовать точно _once, что приводило к множеству сообщений об ошибках, которые не имели для нас смысла. Точно ли это даст нам атомарные записи в нескольких государственных магазинах?

1 Ответ

0 голосов
/ 17 января 2019

Объявление 3. Согласно Исходный проектный документ по поддержке единовременной поддержки в Kafka Streams Я думаю, что с eaxctly_once вы получаете элементарные записи в нескольких хранилищах состояний

Когда вызывается stream.commit (), следующие шаги выполняются по порядку:

  1. Сбросить локальные хранилища состояний (кэши KTable), чтобы убедиться, что все записи журнала изменений отправляются в нисходящем направлении.
  2. Вызовите provider.sendOffsetsToTransactions (offsets), чтобы зафиксировать текущие записанные позиции потребителя в транзакции. Обратите внимание, что хотя потребитель потока может быть разделен между несколькими задачами и, следовательно, несколькими производителями, назначенные разделы задачи всегда являются исключительными, и, следовательно, можно просто зафиксировать смещения назначенных разделов этой задачи.
  3. Вызовите provider.commitTransaction () для подтверждения текущей транзакции. В результате состояние задачи, представленное вышеупомянутым триплетом, фиксируется атомарно.
  4. Вызовите02.beginTransaction () снова, чтобы начать следующую транзакцию.
...