Потоки Кафки: повторы - PullRequest
0 голосов
/ 05 июля 2018

Кафка версия - 1.0.1
Я получаю следующее исключение через случайные интервалы. Попытка увеличить request.timeout.ms до 5 минут, он все еще продолжал время ожидания через случайные интервалы (несколько часов). Непонятно, почему возникает исключение, но перезапуск, похоже, восстанавливается с того места, где он ушел, но требует ручного задания. Итак, попытался включить повторные попытки, но это, кажется, не дает никакого эффекта, потому что я не вижу никаких повторов в журналах (что означает неудачу, затем попытки в первый раз, снова неудачи, затем во второй раз и до тех пор, пока не будет предпринята максимальная попытка). Не могли бы вы пролить некоторый свет на приведенное ниже исключение и посоветовать, как мы можем позволить приложению Kafka stream продолжить работу при возникновении этого исключения, возможно, повторите попытку? Если нам нужно увеличить значение request.timeout.ms для максимального значения, какой недостаток нам нужно знать, а это означает, что мы не должны позволить потоку бесконечно переходить в состояние зависания при сбое брокера?

props.put (ProducerConfig.RETRIES_CONFIG, 3);

    2018-07-05 06:04:25 ERROR Housestream:91 - Unknown Exception occurred
    org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort sending since an error caught with a previous record (key GCB21K1X value [L@5e86f18a timestamp 1530783812110)
    to topic housestream-digitstore-changelog due to org.apache.kafka.common.errors.TimeoutException: Expiring 201 record(s) for housestream-digitstore-changelog: 30144 ms has passed since last append.
            at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
            at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
            at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
            at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
            at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
            at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
            at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
            at java.lang.Thread.run(Thread.java:745)
    Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 201 record(s) for housestream-digitstore-changelog: 30144 ms has passed since last append

Попытка увеличить время ожидания запроса до максимального целочисленного значения, но натолкнулась на другое исключение времени ожидания.

2018-07-05 12:22:15 ERROR Housestream:179 - Unknown Exception occurred
org.apache.kafka.streams.errors.StreamsException: task [1_0] Exception caught while punctuating processor 'validatequote'
        at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:267)
        at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:54)
        at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateSystemTime(StreamTask.java:619)
        at org.apache.kafka.streams.processor.internals.AssignedTasks.punctuate(AssignedTasks.java:430)
        at org.apache.kafka.streams.processor.internals.TaskManager.punctuate(TaskManager.java:324)
        at org.apache.kafka.streams.processor.internals.StreamThread.punctuate(StreamThread.java:969)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:834)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort sending since an error caught with a previous record (key 32342 value com.int.digital.QUOTE@2c73fa63 timestamp 153083237883) to topic digital_quote due to org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms..
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:819)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760)
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:100)
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:78)
        at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:113)
        at org.cox.processor.CheckQuote.handleTasks(CheckQuote.java:122)
        at org.cox.processor.CheckQuote$1.punctuate(CheckQuote.java:145)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$4.run(ProcessorNode.java:131)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:134)
        at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:263)
        ... 8 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.
...