У меня есть приложение 2 Kafka Streams.Одно приложение прослушивает, скажем, topic1
и производит на topic2
, а другое слушает на topic2
и производит на topic3
.Приложения работали нормально до того, как брокер kafka вышел из строя.Посредник вернулся, но потоковые приложения остановились.
Ниже приводится исключение из первого приложения потоков:
Exception in thread "streams-collection-7cda47bc-a1db-4ad5-a3d4-bd8f8dc85bf4-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=o365_activity_contenturl, partition=0, offset=2151
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232)
at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (key null value {"RecordType":6,"ListId":"affd3b1e-5d16-4e36-b97a-871b755b2b40","Version":1,"SourceFileName":"9617","ClientIP":"94.245.89.59","Workload":"OneDrive","UserType":0} timestamp 1527845926991) to topic o365_user_activity due to org.apache.kafka.common.errors.TimeoutException: Expiring 15 record(s) for topic1-0: 32551 ms has passed since batch creation plus linger time.
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 15 record(s) for topic1-0: 32551 ms has passed since batch creation plus linger time
Исключение приложения второго потока:
Exception in thread "streams-distribution-bf0d8698-f198-4d91-ad66-f0833b4ef398-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (key null value {"item_type":"File","workload":"OneDrive","current_date":"2018-06-01","client_ip":"94.245.89.59"} timestamp 1527845926986) to topic topic3 due to org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for topic3-0: 34706 ms has passed since last attempt plus backoff time.
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for topic3-0: 34706 ms has passed since last attempt plus backoff time
Почему потоковое приложение не удается восстановить?
ОБНОВЛЕНИЕ 1:
После обновления с Kafka 1.0.0
до1.1.0
, у меня есть дополнительная информация в журнале:
You can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error.
Приложение Streams по-прежнему не продолжает обработку после того, как я остановил брокер и перезапустил его.
UPDATE 2:
Однако, когда я перезапускаю само приложение потоков, после остановки и запуска брокера kafka оно начинает потреблять.
Конфиги:
props.put(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG, 100000);
props.put(StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 200000);
props.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 60000);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), true);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");