Kafka Streams перестает слушать темы и обрабатывать сообщения, когда брокер выходит из строя - PullRequest
0 голосов
/ 04 июня 2018

У меня есть приложение 2 Kafka Streams.Одно приложение прослушивает, скажем, topic1 и производит на topic2, а другое слушает на topic2 и производит на topic3.Приложения работали нормально до того, как брокер kafka вышел из строя.Посредник вернулся, но потоковые приложения остановились.

Ниже приводится исключение из первого приложения потоков:

Exception in thread "streams-collection-7cda47bc-a1db-4ad5-a3d4-bd8f8dc85bf4-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=o365_activity_contenturl, partition=0, offset=2151
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (key null value {"RecordType":6,"ListId":"affd3b1e-5d16-4e36-b97a-871b755b2b40","Version":1,"SourceFileName":"9617","ClientIP":"94.245.89.59","Workload":"OneDrive","UserType":0} timestamp 1527845926991) to topic o365_user_activity due to org.apache.kafka.common.errors.TimeoutException: Expiring 15 record(s) for topic1-0: 32551 ms has passed since batch creation plus linger time.
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
    at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
    at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
    at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
    at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 15 record(s) for topic1-0: 32551 ms has passed since batch creation plus linger time

Исключение приложения второго потока:

Exception in thread "streams-distribution-bf0d8698-f198-4d91-ad66-f0833b4ef398-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (key null value {"item_type":"File","workload":"OneDrive","current_date":"2018-06-01","client_ip":"94.245.89.59"} timestamp 1527845926986) to topic topic3 due to org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for topic3-0: 34706 ms has passed since last attempt plus backoff time.
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
    at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
    at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
    at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
    at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for topic3-0: 34706 ms has passed since last attempt plus backoff time

Почему потоковое приложение не удается восстановить?

ОБНОВЛЕНИЕ 1:

После обновления с Kafka 1.0.0 до1.1.0, у меня есть дополнительная информация в журнале:

You can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error.

Приложение Streams по-прежнему не продолжает обработку после того, как я остановил брокер и перезапустил его.

UPDATE 2:

Однако, когда я перезапускаю само приложение потоков, после остановки и запуска брокера kafka оно начинает потреблять.

Конфиги:

props.put(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG, 100000);
props.put(StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 200000);
props.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 60000);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), true);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...