версия spark: 2.3
Потоковое приложение Spark выполняет потоковую передачу по пути hdfs
Dataset<Row> lines = spark
.readStream()
.format("text")
.load("path");
И после некоторых преобразований данных,для одного файла задание должно быть в успешном состоянии.
Список заданий добавлен для завершения задания и перемещает файл при его запуске.
@Override
public void onJobENd(SparkListenerJobEnd jobEnd) {
// Move source file to some other location which is finished processing.
}
Файлы успешно перемещены в другое место.Но в то же время (точная отметка времени) искры запускают следующий файл, не найденный исключением. Это происходит случайным образом и не может быть воспроизведено.Но случается часто
Даже если конкретное задание закончено, искра все равно как-то ссылается на файл.
Как удостовериться, что после завершения задания файл не передается по искру, и избежать появления этого файла, не найденного
Я обнаружил, чтона: здесь
SparkListenerJobEnd
DAGScheduler does cleanUpAfterSchedulerStop, handleTaskCompletion, failJobAndIndependentStages, and markMapStageJobAsFinished.
Исключение:
java.io.FileNotFoundException: File does not exist: <filename>
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1932)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1873)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1853)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1825)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:559)
at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:87)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:363)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at org.apache.hadoop.ipc.RemoteExc