Ключевым компонентом, который позволяет Kafka добиться этого, является Координатор транзакций . Это было введено как часть упомянутого вами KIP. Координатор транзакций создается посредником как часть процесса инициализации и сохраняет в памяти следующую информацию:
- Карта от
TransactionalId
до назначенного PID
, номер текущей эпохи (метка времени Unix) и значение времени ожидания транзакции
- Сопоставление
PID
с текущим текущим статусом транзакции производителя, указанным PID
, темой раздела-участника и последним временем обновления этого статуса
Теперь, чтобы ответить на ваш вопрос о переносе или возврате транзакции :
Когда производитель отказывает и перезапускается, он отправляет новый InitPidRequest
Координатору транзакций, если производитель приходит с непустым TransactionalId
(предоставляется в качестве параметра конфигурации приложением производителя).
Координатор транзакций при получении этого запроса затем проверяет, существует ли уже запись с предоставленным TransactionalId
в отображении в памяти (пункт 1 выше). Если существует сопоставление, оно будет искать PID
во второй карте памяти (пункт 2 выше), чтобы проверить, есть ли какая-либо текущая транзакция с этим PID
:
- Если есть текущая транзакция, которая находится в состоянии запуска, т.е.
BEGIN
, то транзакция будет прервана
( Примечание : Это откат версия )
- Если есть текущая транзакция, которая началась и находится в
PREPARE_ABORT
или PREPARE_COMMIT
, то Координатор транзакций будет ждать, пока транзакция не пройдет, либо до COMPLETE_ABORT
( откат версия ) или COMPLETE_COMMIT
( откат версия ).
После этого Координатор транзакций отвечает последним PID
и отметкой времени эпохи для TransactionalId
, и производитель может затем начать отправку новых транзакций.
Я пытался свести объяснение к минимуму, но если вас интересуют более подробные сведения, вот справочный документ для справки.
Надеюсь, это поможет!