Поскольку мы мигрировали на Kafka-клиент 2.3 (ранее он был 1.1), мы периодически получаем исключение RecordTooLargeException
при отправке сообщения в тему Kafka.max.request.size
значение для производителя 524288
.Как вы можете видеть ниже Event
объект даже не близок к max.request.size
пределу:
(key 4bc2eef4-ac1c-97bf-518b-4a32c38b9e4f value Event(transactionId=88834013-28c3-405d-9f69-81089dfa9246, action=RECORDING_REQUESTED, dataCenter=NEWPORT, ccid=2455, resourceId=4bc2eef4-ac1c-97bf-518b-4a32c38b9e4f, channelId=721ab65b-5333-8c23-4a03-3fff869176c9, canonicalId=462a90a8-7e1e-71a5-4859-eb076e5397ba, seriesId=c8ff610c-77f6-713a-32c3-eac8f6e632fa, externalId=EP001890580021, scheduleDurationInSec=2160, scheduleStartTimeMillisecs=1569356100000, recordingCount=38, dataCenterPath=null, startIndex=0, relativeFolderPath=4bc2eef4-ac1c-97bf-518b-4a32c38b9e4f-0, compressionModel=COMPRESSED, dataPlaneStatus=null, extraProperties=null, error=null, errorDesc=null) timestamp 1569356100063)
Обычно это происходит, когда приложение находится под нагрузкой и создается много сообщений (несколько сотен или более в секунду).).Я не уверен, включает ли max.request.size
размер заголовков сообщения, но для каждого сообщения у нас может быть несколько заголовков с общим размером менее 100 байт.
org.apache.kafka.streams.errors.StreamsException: task [0_3] Abort sending since an error caught with a previous record (key 4bc2eef4-ac1c-97bf-518b-4a32c38b9e4f value Event(transactionId=88834013-28c3-405d-9f69-81089dfa9246, action=RECORDING_REQUESTED, dataCenter=NEWPORT, ccid=2455, resourceId=4bc2eef4-ac1c-97bf-518b-4a32c38b9e4f, channelId=721ab65b-5333-8c23-4a03-3fff869176c9, canonicalId=462a90a8-7e1e-71a5-4859-eb076e5397ba, seriesId=c8ff610c-77f6-713a-32c3-eac8f6e632fa, externalId=EP001890580021, scheduleDurationInSec=2160, scheduleStartTimeMillisecs=1569356100000, recordingCount=38, dataCenterPath=null, startIndex=0, relativeFolderPath=4bc2eef4-ac1c-97bf-518b-4a32c38b9e4f-0, compressionModel=COMPRESSED, dataPlaneStatus=null, extraProperties=null, error=null, errorDesc=null) timestamp 1569356100063) to topic datacenter due to org.apache.kafka.common.errors.RecordTooLargeException: The message is 716830 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:138)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:50)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:201)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:930)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:167)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at com.lnt.eg.kafka.RecordingRequestProcessor.lambda$processPayloadForDataCenter$0(RecordingRequestProcessor.java:116)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at com.lnt.eg.kafka.RecordingRequestProcessor.processPayloadForDataCenter(RecordingRequestProcessor.java:116)
at com.lnt.eg.kafka.RecordingRequestProcessor.transform(RecordingRequestProcessor.java:85)
at com.lnt.eg.kafka.RecordingRequestProcessor.transform(RecordingRequestProcessor.java:27)
at org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:47)
at org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:36)
at org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:56)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:890)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 716830 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.```