Я использую пружинную кафку с идемпотентной конфигурацией производителя:
это мои реквизиты конфигурации:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Joiner.on(",").join(appProps.getBrokers()));
//configure the following three settings for SSL Encryption
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, appProps.getJksLocation());
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, appProps.getJksPassword());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
props.put(ProducerConfig.RETRIES_CONFIG, 5);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Мой производитель кафки выбрасывает OutOfOrderSequenceException:
2019-03-06 21:25:47 Отправитель [ОШИБКА] [Producer clientId = provider-1] Брокер возвратил org.apache.kafka.common.errors.OutOfOrderSequenceException: Брокер получил порядковый номер не по порядку для тема-раздел тема-1 по смещению -1. Это указывает на потерю данных на брокере и должно быть расследовано.
2019-03-06 21:25:47 TransactionManager [INFO] [Producer clientId = provider-1] ProducerId установлен в -1 с эпохой -1
2019-03-06 21:25:47 В ProducerKafka [ОШИБКА] возникла ошибка при отправке в kafka, повторите попытку
Я не уверен, почему выбрасывается это исключение. Я не мог найти конкретный ответ на это. Официальный Javadoc для исключения гласит следующее:
Это исключение означает, что посредник получил неожиданный порядковый номер от производителя, что означает, что данные могли быть потеряны. Если производитель настроен только на идемпотентность (т. Е. Если установлен параметр enable.idempotence и не настроен ни один транзакционный.id), можно продолжить отправку с тем же экземпляром производителя, но это может привести к переупорядочению отправленных записей. Для транзакционных производителей это фатальная ошибка, и вы должны закрыть производителя.
Значит ли это, что мне нужно использовать транзакционного производителя, чтобы избежать этой проблемы?
В документе KafkaProducer сказано что-то, что делает приведенное выше утверждение двусмысленным: https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
Чтобы включить идемпотентность, для конфигурации enable.idempotence должно быть задано значение true. Если установлено, конфигурация повторов будет по умолчанию установлена в Integer.MAX_VALUE, конфигурация max.in.flight.requests.per.connection будет установлена в 1, а конфигурация acks будет установлена по умолчанию для всех. Для идемпотентного производителя нет изменений API, поэтому существующие приложения не нужно будет модифицировать, чтобы воспользоваться этой функцией.
Чтобы воспользоваться преимуществом идемпотентного производителя, необходимо избегать повторных отправлений на уровне приложений, поскольку их нельзя дублировать. Таким образом, если приложение разрешает идемпотентность, рекомендуется оставить конфигурацию повторных попыток неустановленной, поскольку по умолчанию она будет равна Integer.MAX_VALUE. Кроме того, если send (ProducerRecord) возвращает ошибку даже при бесконечных повторных попытках (например, если в буфере истекает срок действия сообщения перед отправкой), рекомендуется завершить работу производителя и проверить содержимое последнего созданного сообщения, чтобы убедиться, что что это не дублируется. Наконец, производитель может гарантировать идемпотентность только для сообщений, отправленных в течение одного сеанса.
В приведенном выше утверждении четко сказано, что все, что нужно для идемпотентного производителя, это просто использовать свойство enable.idempotence
. Однако исключение гласит, что я должен использовать это свойство transactional.id
.
Как правильно создать идемпотентного асинхронного производителя, не имея дело с фатальным OutOfOrderSequenceException
.