Загрузка файла SFTP прошла успешно, но не может быть прервана - PullRequest
0 голосов
/ 04 марта 2020

Я загружаю файл на SFTP-сервер с помощью Alpakka. Приведенный ниже код успешно загружен, но я не могу завершить работу системы. Даже если поток успешно завершен, эта ошибка возникает:

[ERROR] [03/04/2020 12:40:26.996] [test-akka.stream.default-blocking-io-dispatcher-13] [akka://test/system/StreamSupervisor-0/flow-0-2-sFtpIOSink] Error during postStop in [akka.stream.alpakka.ftp.impl.FtpSourceFactory$$anon$4@3d555252]: Timeout expired
net.schmizz.sshj.connection.ConnectionException: Timeout expired
    at net.schmizz.sshj.connection.ConnectionException$1.chain(ConnectionException.java:32)
    at net.schmizz.sshj.connection.ConnectionException$1.chain(ConnectionException.java:26)
    at net.schmizz.concurrent.Promise.retrieve(Promise.java:139)
    at net.schmizz.concurrent.Event.await(Event.java:105)
    at net.schmizz.sshj.connection.channel.AbstractChannel.close(AbstractChannel.java:265)
    at net.schmizz.sshj.sftp.SFTPEngine.close(SFTPEngine.java:253)
    at net.schmizz.sshj.sftp.SFTPClient.close(SFTPClient.java:250)
    at akka.stream.alpakka.ftp.impl.SftpOperations$class.disconnect(SftpOperations.scala:50)
    at akka.stream.alpakka.ftp.impl.FtpLike$$anon$3.disconnect(FtpLike.scala:62)
    at akka.stream.alpakka.ftp.impl.FtpLike$$anon$3.disconnect(FtpLike.scala:62)
    at akka.stream.alpakka.ftp.impl.FtpGraphStageLogic$$anonfun$disconnect$1.apply(FtpGraphStageLogic.scala:67)
    at akka.stream.alpakka.ftp.impl.FtpGraphStageLogic$$anonfun$disconnect$1.apply(FtpGraphStageLogic.scala:67)
    at scala.Option.foreach(Option.scala:257)
    at akka.stream.alpakka.ftp.impl.FtpGraphStageLogic.disconnect(FtpGraphStageLogic.scala:67)
    at akka.stream.alpakka.ftp.impl.FtpGraphStageLogic.postStop(FtpGraphStageLogic.scala:49)
    at akka.stream.alpakka.ftp.impl.FtpIOSinkStage$$anon$2.postStop(FtpIOGraphStage.scala:218)
    at akka.stream.impl.fusing.GraphInterpreter.finalizeStage(GraphInterpreter.scala:579)
    at akka.stream.impl.fusing.GraphInterpreter.afterStageHasRun(GraphInterpreter.scala:560)
    at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:380)
    at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:606)
    at akka.stream.impl.fusing.ActorGraphInterpreter$SimpleBoundaryEvent$class.execute(ActorGraphInterpreter.scala:47)
    at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary$OnComplete.execute(ActorGraphInterpreter.scala:76)
    at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:581)
    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:749)
    at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:764)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:539)
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:671)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
    at akka.actor.ActorCell.invoke(ActorCell.scala:581)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
    at akka.dispatch.Mailbox.run(Mailbox.scala:229)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Timeout expired
    ... 32 more

код:

implicit val system = ActorSystem("test")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher  

val settings = SftpSettings
      .create(InetAddress.getByName(host))
      .withPort(port)
      .withCredentials(creds)
      .withStrictHostKeyChecking(false)  

val stream: Future[IOResult] = FileIO.fromPath(file)
      .to(Sftp.toPath(remoteFilePath, settings, false))
      .run()

    stream onComplete {
      case Success(_) => {
        println(f"is completed? ${stream.isCompleted}")
        system.terminate()
      }
      case Failure(t) => println("An error has occurred: " + t.getMessage)
    }
...