IOException, когда диспетчер задач восстанавливается из состояния RocksDB в hdfs - PullRequest
0 голосов
/ 22 сентября 2019

Я пытаюсь отправить задание flink (версия 1.8.2) с бэкэндом RocksDB в свой собственный кластер пряжи (hadoop version 2.6.0-cdh5.7.3), задание всегда не выполнялось после нескольких часов работы с потерей соединениянекоторых менеджеров задач.Когда я глубоко изучил журнал пряжи, я обнаружил следующее исключение в диспетчере задач (а здесь - полный журнал), кто-нибудь может сказать мне, как решить эту проблему?PS: Я построил код Flink Agianst моей версии Hadoop.

2019-09-21 10:42:18,258 WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure  - Exception while restoring operator state backend for StreamSource_bc764cd8ddf7a0cff126f51c16239658_(235/300) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
    at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
    at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:537)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:246)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:255)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:740)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:291)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Stream closed
    at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:877)
    at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:942)
    at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:742)
    at java.io.FilterInputStream.read(FilterInputStream.java:83)
    at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:84)
    at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51)
    at java.io.DataInputStream.readUnsignedShort(DataInputStream.java:337)
    at java.io.DataInputStream.readUTF(DataInputStream.java:589)
    at java.io.DataInputStream.readUTF(DataInputStream.java:564)
    at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:202)
    at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
    at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
    ... 11 more
...