Apache Flink: как обрабатывать исключения в раковине Кафки? - PullRequest
0 голосов
/ 13 сентября 2018

У меня есть задание Flink, которое записывает данные в Kafka. В теме Kafka максимальный размер сообщения установлен на 5 МБ, поэтому, если я попытаюсь записать любую запись размером более 5 МБ, она выдаст следующее исключение и завершит работу.

java.lang.Exception: Failed to send data to Kafka: The request included a message larger than the max message size the server will accept.
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:350)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

Теперь я настроил контрольные точки в моей работе, поэтому, если работа не удалась, она снова запускается. Проблема в том, что при каждом перезапуске происходит сбой для одной и той же записи, и происходит бесконечный цикл сбоев и перезапусков. Есть ли способ обработать это исключение Кафки в моем коде, чтобы он не сбивал всю работу?

1 Ответ

0 голосов
/ 13 сентября 2018

Возможно, вы могли бы ввести фильтр перед приемником Kafka, который бы обнаруживал и отфильтровывал записи, которые будут слишком большими. Немного хакерский, но это может быть легко. В противном случае я бы посмотрел на расширение FlinkKafkaProducer010, чтобы можно было обработать исключение.

...