Kafka Streams - процессная контекстная фиксация - PullRequest
0 голосов
/ 07 января 2019

Должны ли мы когда-либо сами вызывать processorContext.commit() в Processor реализации? Я имею в виду вызов метода commit внутри запланированной Punctuator реализации или внутри process метода.

в каких случаях мы должны это делать, и нужно ли нам это вообще? этот вопрос касается как Kafka DSL с transform(), так и Processor API.

кажется, что Kafka Streams обрабатывает это сам, также вызов processorContext.commit() не гарантирует, что это будет сделано немедленно.

Ответы [ 2 ]

0 голосов
/ 09 января 2019

Для случая использования я собираю определенное количество записей в методе обработки процессором и записываю пакетные записи в файл из функции процесса, если размер пакета достигает определенного числа (скажем, 10).

Допустим, мы записываем один пакет записей в файл, и происходит сбой системы в момент, когда происходит фиксация (поскольку мы не можем вызвать явные фиксации). При следующем запуске потока процессор обрабатывает записи с последнего принятого смещения. Это означает, что мы можем записывать некоторые дубликаты данных в файлы. В любом случае, чтобы избежать записи дублирующих данных ??

0 голосов
/ 08 января 2019

Можно вызывать commit() - либо с процессора, либо с пунктуации - вот почему предлагается этот API.

В то время как Kafka Streams фиксирует с регулярным (настраиваемым) интервалом, вы можете запросить промежуточные коммиты при его использовании. Одним из примеров использования может быть то, что вы обычно делаете дешевые вычисления, но иногда вы делаете что-то дорогое и хотите зафиксировать как можно скорее после этой операции вместо ожидания следующего интервала фиксации (чтобы уменьшить вероятность сбоя после дорогой операции и следующий интервал фиксации). Другой вариант использования будет, если вы установите интервал фиксации на MAX_VALUE, что эффективно «отключит» регулярные коммиты и решит, когда фиксировать базу на основе вашей бизнес-логики.

Полагаю, что для большинства случаев использования вызов commit() необязателен.

...