Какфа Производитель Сообщение Доставка с подтверждением = все и заполнить - PullRequest
0 голосов
/ 07 мая 2019

Создание Kafka Producer с конфигурацией "acks = all".

Имеет ли какое-либо значение вызов флеша с указанным выше конфиг?.

AS

acks = all Это означает, что ведущий будет ожидать полного набора синхронных реплик для подтверждения записи.Это гарантирует, что запись не будет потеряна, пока хотя бы одна синхронная реплика остается в живых.Это самая сильная из доступных гарантий.Это эквивалентно настройке acks = -1.

Ответы [ 2 ]

3 голосов
/ 11 мая 2019

Позвольте мне сначала попытаться вызвать различие между flush() и acks, прежде чем я перейду к двум вопросам.

flush() - Это метод, который должен вызываться в продюсере длясообщения брокерам из буфера (настраиваемые), поддерживаемые на стороне производителя.Вы бы либо вызвали этот метод, либо close() для отправки сообщений посредникам из буфера производителя.Это вызывается автоматически, если буферная память, доступная производителю, заполняется (как описано Маноджем в его ответе).Тем не менее,

acks=ALL является обязанностью брокера, т.е. отправлять подтверждение обратно производителю после синхронной репликации сообщений другим брокерам согласно настройке, запрашиваемой у производителя.Вы можете использовать этот параметр, чтобы настроить семантику доставки сообщений.В этом случае, как только сообщения будут реплицированы в назначенные синхронные реплики, брокер отправит подтверждение производителю со словами: «Я получил ваши сообщения».

Теперь по вашим вопросам, т. Е. Еслиесть какое-то значение вызова сброса с параметром acks и того, будет ли производитель ждать запуска сброса перед отправкой в ​​брокер.

Что ж, асинхронный характер производителя гарантирует, что производительне ждет.Однако, если вы вызываете flush() явно или если он вызывается сам по себе, любые дальнейшие отправки будут блокироваться, пока производитель не получит подтверждение от брокера.Таким образом, отношения между этими двумя очень тонкие.

Надеюсь, это поможет!

3 голосов
/ 07 мая 2019

Согласно документации

вровень ():

Вызов этого метода делает все буферизованные записи доступными сразу. отправить (даже если linger_ms больше 0) и блокирует завершение запросов, связанных с этими записями. пост-условие для flush () состоит в том, что любая ранее отправленная запись будет иметь завершено (например, Future.is_done () == True). Запрос рассматривается Завершено, когда либо оно успешно подтверждено в соответствии с Конфигурация «acks» для производителя, или это приводит к ошибке.

Другие потоки могут продолжать отправлять сообщения, пока один поток заблокировано ожидание завершения вызова; тем не менее, нет никаких гарантий сделано о завершении сообщений, отправленных после вызова сброса начинается.

flush () будет по-прежнему блокировать клиентское приложение, пока все сообщения не будут отправлены, даже если ack = 0. Единственное, что он не будет ждать подтверждения, блок только до тех пор, пока не будет отправлен буфер.

flush () с ack = all гарантирует, что сообщения были отправлены и реплицированы в кластере с необходимым коэффициентом репликации.

Наконец, чтобы ответить на ваш вопрос: Будет ли он ждать запуска сброса перед отправкой брокеру ?

Ответ: Не обязательно. Производитель продолжает отправлять сообщения с интервалом или размером пакета ( buffer.memory контролирует общий объем памяти, доступной производителю для буферизации ). Но всегда полезно использовать flush (), чтобы убедиться, что вы отправляете все сообщения.

Обратитесь к этой ссылке для получения дополнительной информации.

...