контрольно-пропускной пункт и производитель кафки ровно один раз - PullRequest
0 голосов
/ 22 февраля 2020

Когда я создаю производителя kafka ровно один раз, если я также использую контрольную точку, это приведет к такой проблеме:

java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1260)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1155)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1132)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1111)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.close(FlinkKafkaInternalProducer.java:150)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1093)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1031)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:881)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:395)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)

Как я могу ее решить?

...