У меня есть приложение, которое использует Apache Flink Streaming Framework, а также работает с источниками и приемниками kafka.
Во время обработки данных я случайным образом получу исключения, подобные этому:
09:59:16.087 ERROR o.a.k.clients.producer.KafkaProducer - Interrupted while joining ioThread
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[na:1.8.0_51]
at java.lang.Thread.join(Thread.java:1253) [na:1.8.0_51]
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1031) [kafka-clients-0.11.0.2.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1010) [kafka-clients-0.11.0.2.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:989) [kafka-clients-0.11.0.2.jar:na]
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.close(FlinkKafkaProducer.java:168) [flink-connector-kafka-0.11_2.11-1.6.3.jar:1.6.3]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.close(FlinkKafkaProducer011.java:663) [flink-connector-kafka-0.11_2.11-1.6.3.jar:1.6.3]
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) [flink-core-1.6.3.jar:1.6.3]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) [flink-streaming-java_2.11-1.6.3.jar:1.6.3]
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477) [flink-streaming-java_2.11-1.6.3.jar:1.6.3]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378) [flink-streaming-java_2.11-1.6.3.jar:1.6.3]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) [flink-runtime_2.11-1.6.3.jar:1.6.3]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
09:59:16.087 ERROR o.a.f.s.runtime.tasks.StreamTask - Error during disposal of stream operator.
org.apache.kafka.common.KafkaException: Failed to close kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1062) ~[kafka-clients-0.11.0.2.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1010) ~[kafka-clients-0.11.0.2.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:989) ~[kafka-clients-0.11.0.2.jar:na]
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.close(FlinkKafkaProducer.java:168) ~[flink-connector-kafka-0.11_2.11-1.6.3.jar:1.6.3]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.close(FlinkKafkaProducer011.java:663) ~[flink-connector-kafka-0.11_2.11-1.6.3.jar:1.6.3]
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) ~[flink-core-1.6.3.jar:1.6.3]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-streaming-java_2.11-1.6.3.jar:1.6.3]
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477) [flink-streaming-java_2.11-1.6.3.jar:1.6.3]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378) [flink-streaming-java_2.11-1.6.3.jar:1.6.3]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) [flink-runtime_2.11-1.6.3.jar:1.6.3]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
Однако я сомневаюсь, что фактическая проблема имеет какое-либо отношение к производителю кафки.У меня было то же исключение некоторое время назад (см .: этот пост ), и я смог точно определить проблему, построив мое (теперь довольно сложное) приложение на более мелкие блоки, пока оно, наконец, не выдало исключение из в моем коде .На этот раз я не смог найти ошибку таким образом, и поэтому я заблудился и не знаю, как это расследовать.
Итак, вопрос в том, как мне найти источник этих исключений?Есть ли рекомендуемый способ отладки приложения flink для поиска ошибок такого рода?