Как Кафка узнает, нужно ли «откатить или откатить» транзакцию? - PullRequest
1 голос
/ 15 мая 2019

С точно один раз KIP , касающийся идемпотентности производителя при перезапуске приложения с помощью InitPidRequest:

2.1 Когда указан TransactionalId Если установлена ​​конфигурация транзакции.idэтот TransactionalId передается вместе с InitPidRequest, и сопоставление с соответствующим PID регистрируется в журнале транзакций на этапе 2a.Это позволяет нам возвращать один и тот же PID для TransactionalId будущим экземплярам производителя и, следовательно, позволяет восстанавливать или прерывать ранее незавершенные транзакции.

Помимо возврата PID, InitPidRequest выполняет следующие задачи:

  1. Увеличивает эпоху PID, так что любой предыдущий экземпляр производителя-зомби отгорожен и не может двигаться дальше со своей транзакцией.

  2. Восстанавливает (откатывает или откатывает) любую транзакцию, оставшуюся незавершенной предыдущим экземпляром производителя.Обработка InitPidRequest является синхронной.Как только он возвращается, производитель может отправлять данные и начинать новые транзакции.

Когда производитель отказывает и запускается снова и выполняется InitPidRequest, какие ситуации являются последней транзакцией«откат» (думаю, это означает «совершено») или «откат»?Как это контролируется?

1 Ответ

1 голос
/ 20 мая 2019

Ключевым компонентом, который позволяет Kafka добиться этого, является Координатор транзакций . Это было введено как часть упомянутого вами KIP. Координатор транзакций создается посредником как часть процесса инициализации и сохраняет в памяти следующую информацию:

  1. Карта от TransactionalId до назначенного PID, номер текущей эпохи (метка времени Unix) и значение времени ожидания транзакции
  2. Сопоставление PID с текущим текущим статусом транзакции производителя, указанным PID, темой раздела-участника и последним временем обновления этого статуса

Теперь, чтобы ответить на ваш вопрос о переносе или возврате транзакции :

Когда производитель отказывает и перезапускается, он отправляет новый InitPidRequest Координатору транзакций, если производитель приходит с непустым TransactionalId (предоставляется в качестве параметра конфигурации приложением производителя).

Координатор транзакций при получении этого запроса затем проверяет, существует ли уже запись с предоставленным TransactionalId в отображении в памяти (пункт 1 выше). Если существует сопоставление, оно будет искать PID во второй карте памяти (пункт 2 выше), чтобы проверить, есть ли какая-либо текущая транзакция с этим PID:

  • Если есть текущая транзакция, которая находится в состоянии запуска, т.е. BEGIN, то транзакция будет прервана ( Примечание : Это откат версия )
  • Если есть текущая транзакция, которая началась и находится в PREPARE_ABORT или PREPARE_COMMIT, то Координатор транзакций будет ждать, пока транзакция не пройдет, либо до COMPLETE_ABORT ( откат версия ) или COMPLETE_COMMIT ( откат версия ).

После этого Координатор транзакций отвечает последним PID и отметкой времени эпохи для TransactionalId, и производитель может затем начать отправку новых транзакций.

Я пытался свести объяснение к минимуму, но если вас интересуют более подробные сведения, вот справочный документ для справки.

Надеюсь, это поможет!

...