Я пытаюсь подключиться к разъему Flink kafka 0.11, но он продолжает выдавать мне эту ошибку при запуске задания.
java.lang.RuntimeException: Error while confirming checkpoint
at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
Как я понял из документации kafka, таймаут транзакции должен быть больше интервала контрольной точки, но меньше, чем транзакция брокера. max.timeout.ms.
Мой кластер настроен следующим образом:
- Flink версия 1.4.2
- Приложение с Flink-connector-kafka-0.11_2.11
- Интервал контрольной точки: 5000 мс
- Наблюдаемое время сквозной контрольной точки: 2 с
Конфигурация производителя Kafka:
transactional.id : tx-kafka-topic1
transaction.timeout.ms : 30000
acks: all
enable.idempotence : true
retries: 3
max.in.flight.requests.per.connection : 1
Брокер Kafka (kafka_2.11-1.0.0-cp1.jar) с конфигурацией сервера:
transaction.max.timeout.ms=120000
transaction.state.log.replication.factor=3
Мне кажется, интервал не перекрывается друг с другом, нозадание все еще не выполнено с ошибкой выше.Цените, если кто-то может указать мне правильное направление.