Я написал простое приложение для подсчета слов, используя Flink 1.7.2 с Kafka 2.2 как для потребителя, так и для производителя. Я использую семантику «точно один раз» для производителя Kafka, KeyedProcessFunction
для обработки с сохранением состояния, MapState
для сохранения моего состояния и RocksDB
с добавочной контрольной точкой в качестве внутреннего состояния.
Приложение работает довольно хорошо, когда я запускаю его из IntelliJ, но когда я отправляю его в свой локальный кластер Flink, я получаю исключение AsynchronousException
, и приложение Flink продолжает повторяться через каждые 0-20 секунд. Кто-нибудь сталкивался с этой проблемой раньше? Я что-то упускаю с точки зрения конфигурации?
Вот мой код:
class KeyedProcFuncWordCount extends KeyedProcessFunction[String, String, (String, Int)]
{
private var state: MapState[String, Int] = _
override def open(parameters: Configuration): Unit =
{
state = getRuntimeContext
.getMapState(new MapStateDescriptor[String, Int]("wordCountState", createTypeInformation[String],
createTypeInformation[Int]))
}
override def processElement(value: String,
ctx: KeyedProcessFunction[String, String, (String, Int)]#Context,
out: Collector[(String, Int)]): Unit =
{
val currentSum =
if (state.contains(value)) state.get(value)
else 0
val newSum = currentSum + 1
state.put(value, newSum)
out.collect((value, newSum))
}
}
object KafkaProcFuncWordCount
{
val bootstrapServers = "localhost:9092"
val inTopic = "test"
val outTopic = "test-out"
def main(args: Array[String]): Unit =
{
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(30000)
env.setStateBackend(new RocksDBStateBackend("file:///tmp/data/db.rdb", true))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val consumerProps = new Properties
consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "KafkaProcFuncWordCount")
consumerProps.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
val kafkaConsumer = new FlinkKafkaConsumer011[String](inTopic, new SimpleStringSchema, consumerProps)
val producerProps = new Properties
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
producerProps.setProperty(ProducerConfig.RETRIES_CONFIG, "2147483647")
producerProps.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
producerProps.setProperty(ProducerConfig.ACKS_CONFIG, "all")
producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
val kafkaProducer = new FlinkKafkaProducer011[String](
outTopic,
new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema),
producerProps,
Optional.of(new FlinkFixedPartitioner[String]),
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
5
)
val text = env.addSource(kafkaConsumer)
val runningCounts = text
.keyBy(_.toString)
.process(new KeyedProcFuncWordCount)
.map(_.toString())
runningCounts
.addSink(kafkaProducer)
env.execute("KafkaProcFuncWordCount")
}
}
Вот часть из журнала flink taskexecutor, которая продолжает повторяться:
2019-07-05 14:05:47,548 INFO org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer - Flushing new partitions
2019-07-05 14:05:47,552 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011 - Starting FlinkKafkaProducer (1/1) to produce into default topic test-out
2019-07-05 14:05:47,775 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally KeyedProcess -> Map -> Sink: Unnamed (1/1) (f61d24c993f400394eaa028981a26bfe).
2019-07-05 14:05:47,776 INFO org.apache.flink.runtime.taskmanager.Task - KeyedProcess -> Map -> Sink: Unnamed (1/1) (f61d24c993f400394eaa028981a26bfe) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator KeyedProcess -> Map -> Sink: Unnamed (1/1).}
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator KeyedProcess -> Map -> Sink: Unnamed (1/1).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot.<init>(Ljava/util/function/Supplier;)V
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.lang.NoSuchMethodError: org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot.<init>(Ljava/util/function/Supplier;)V
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011$TransactionStateSerializer$TransactionStateSerializerSnapshot.<init>(FlinkKafkaProducer011.java:1244)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011$TransactionStateSerializer.snapshotConfiguration(FlinkKafkaProducer011.java:1235)
at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializerConfigSnapshot.<init>(TwoPhaseCommitSinkFunction.java:847)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializer.snapshotConfiguration(TwoPhaseCommitSinkFunction.java:792)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializer.snapshotConfiguration(TwoPhaseCommitSinkFunction.java:615)
at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.computeSnapshot(RegisteredOperatorStateBackendMetaInfo.java:170)
at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.snapshot(RegisteredOperatorStateBackendMetaInfo.java:103)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:711)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 more
Большое спасибо заранее за вашу помощь.