Поток Kafka генерирует исключение RecordTooLargeException при пересылке сообщения от процессора - PullRequest
0 голосов
/ 24 сентября 2019

Поскольку мы мигрировали на Kafka-клиент 2.3 (ранее он был 1.1), мы периодически получаем исключение RecordTooLargeException при отправке сообщения в тему Kafka.max.request.size значение для производителя 524288.Как вы можете видеть ниже Event объект даже не близок к max.request.size пределу:


(key 4bc2eef4-ac1c-97bf-518b-4a32c38b9e4f value Event(transactionId=88834013-28c3-405d-9f69-81089dfa9246, action=RECORDING_REQUESTED, dataCenter=NEWPORT, ccid=2455, resourceId=4bc2eef4-ac1c-97bf-518b-4a32c38b9e4f, channelId=721ab65b-5333-8c23-4a03-3fff869176c9, canonicalId=462a90a8-7e1e-71a5-4859-eb076e5397ba, seriesId=c8ff610c-77f6-713a-32c3-eac8f6e632fa, externalId=EP001890580021, scheduleDurationInSec=2160, scheduleStartTimeMillisecs=1569356100000, recordingCount=38, dataCenterPath=null, startIndex=0, relativeFolderPath=4bc2eef4-ac1c-97bf-518b-4a32c38b9e4f-0, compressionModel=COMPRESSED, dataPlaneStatus=null, extraProperties=null, error=null, errorDesc=null) timestamp 1569356100063)

Обычно это происходит, когда приложение находится под нагрузкой и создается много сообщений (несколько сотен или более в секунду).).Я не уверен, включает ли max.request.size размер заголовков сообщения, но для каждого сообщения у нас может быть несколько заголовков с общим размером менее 100 байт.

org.apache.kafka.streams.errors.StreamsException: task [0_3] Abort sending since an error caught with a previous record (key 4bc2eef4-ac1c-97bf-518b-4a32c38b9e4f value Event(transactionId=88834013-28c3-405d-9f69-81089dfa9246, action=RECORDING_REQUESTED, dataCenter=NEWPORT, ccid=2455, resourceId=4bc2eef4-ac1c-97bf-518b-4a32c38b9e4f, channelId=721ab65b-5333-8c23-4a03-3fff869176c9, canonicalId=462a90a8-7e1e-71a5-4859-eb076e5397ba, seriesId=c8ff610c-77f6-713a-32c3-eac8f6e632fa, externalId=EP001890580021, scheduleDurationInSec=2160, scheduleStartTimeMillisecs=1569356100000, recordingCount=38, dataCenterPath=null, startIndex=0, relativeFolderPath=4bc2eef4-ac1c-97bf-518b-4a32c38b9e4f-0, compressionModel=COMPRESSED, dataPlaneStatus=null, extraProperties=null, error=null, errorDesc=null) timestamp 1569356100063) to topic datacenter due to org.apache.kafka.common.errors.RecordTooLargeException: The message is 716830 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.   
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:138)   
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:50)   
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:201)  
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:930)   
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856)   
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:167)   
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102)   
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)   
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)   
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)   
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)   
    at com.lnt.eg.kafka.RecordingRequestProcessor.lambda$processPayloadForDataCenter$0(RecordingRequestProcessor.java:116)   
    at java.util.ArrayList.forEach(ArrayList.java:1257)   
    at com.lnt.eg.kafka.RecordingRequestProcessor.processPayloadForDataCenter(RecordingRequestProcessor.java:116)   
    at com.lnt.eg.kafka.RecordingRequestProcessor.transform(RecordingRequestProcessor.java:85)   
    at com.lnt.eg.kafka.RecordingRequestProcessor.transform(RecordingRequestProcessor.java:27)   
    at org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:47)   
    at org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:36)   
    at org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:56)   
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)   
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)   
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)   
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)   
    at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)   
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)   
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)   
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)   
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)   
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)   
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366)   
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)   
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)   
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:890)   
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)   
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)   
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 716830 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.```


...