Как реализовать выполнение процесса как реактивное `Observable [String] - PullRequest
0 голосов
/ 16 января 2019

Я хочу представить выполнение внешнего процесса как Observable[String], где String - строка из вывода процесса. Вот пример того, что я делаю, это сработало:

import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable

object TestSo {

  def main(args: Array[String]): Unit = {

    val lineStream = scala.sys.process.Process("python3 test.py").lineStream

    val lineStreamO: Observable[String] = Observable.fromIterator(Task(lineStream.iterator))
      .doOnNext(l => Task(println(l))) //logging
      .guarantee(Task(println("clean resources")))

    println(lineStreamO.toListL.runSyncUnsafe())
  }

}

Вы можете видеть, что процесс выдает новую строку каждую секунду. Но это неважно. Просто приведите полный пример, test.py:

from time import sleep
print(0, flush=True)
sleep(1)
print(1, flush=True)
sleep(1)
print(2, flush=True)
sleep(1)
print(3, flush=True)
sleep(1)
print(4, flush=True)

Выход:

0
1
2
3
4
5
clean resources
List(0, 1, 2, 3, 4, 5)

Задача :

Я хочу установить тайм-аут - если процесс зависает (например, sleep 100000), процесс должен быть остановлен после тайм-аута. Также, если процесс завершен или завершился неудачей, некоторые ресурсы должны быть очищены (guarantee в примере). Нулевой код выхода должен представлять ошибку.

Как реализовать выполнение процесса как Observable[String] с обработкой ошибок propper? rx-java решения приветствуются.

Ответы [ 2 ]

0 голосов
/ 31 мая 2019

Я реализовал выполнение процесса как реактивное rxjava2 Observable в маленькой библиотеке , которая обертывает NuProcess реактивным способом. Например:

PreparedStreams streams = builder.asStdInOut();

Single<NuProcess> started = streams.started();
Single<Exit> done = streams.waitDone();
Observable<byte[]> stdout = streams.stdOut();
Observer<byte[]> stdin = streams.stdIn();

done.subscribe();
0 голосов
/ 17 января 2019

Потребность в тайм-ауте заставит вас переписать основную часть логики lineStream. С другой стороны, при таком переписывании вы можете избежать промежуточного Iterator и напрямую вставить строки в Subject. Для логики тайм-аута вы можете использовать метод Monix timeoutOnSlowUpstream, но вам все равно придется обработать ошибку тайм-аута и закрыть запущенный процесс.

Также есть выбор, что делать с длинным выходом и несколькими подписчиками. В этом коде я решил использовать ограниченный буфер replayLimited. В зависимости от ваших потребностей вы можете выбрать другую стратегию. Вот эскиз решения:

object ProcessHelper {

  import scala.sys.process.{Process, BasicIO}
  import scala.concurrent.duration.FiniteDuration
  import monix.eval.Task
  import monix.execution.Scheduler
  import monix.reactive.subjects.ConcurrentSubject
  import monix.reactive.Observable

  private class FinishedFlagWrapper(var finished: Boolean = false)

  def buildProcessLinesObservable(cmd: String, timeout: FiniteDuration, bufferLines: Int = 100)(implicit scheduler: Scheduler): Observable[String] = {
    // works both as a holder for a mutable boolean var and as a synchronization lock
    // that is required to preserve semantics of a Subject, particularly
    // that onNext is never called after onError or onComplete
    val finished = new FinishedFlagWrapper()

    // whether you want here replayLimited or some other logic depends on your needs
    val subj = ConcurrentSubject.replayLimited[String](bufferLines)

    val proc = Process(cmd).run(BasicIO(withIn = false,
      line => finished.synchronized {
        if (!finished.finished)
          subj.onNext(line)
      }, None))

    // unfortunately we have to block a whole thread just to wait for the exit code
    val exitThread = new Thread(() => {
      try {
        val exitCode = proc.exitValue()
        finished.synchronized {
          if (!finished.finished) {
            finished.finished = true
            if (exitCode != 0) {
              subj.onError(new RuntimeException(s"Process '$cmd' has exited with $exitCode."))
            }
            else {
              subj.onComplete()
            }
          }
        }
      }
      catch {
        // ignore when this is a result of our timeout
        case e: InterruptedException => if(!finished.finished) e.printStackTrace()
      }
    }, "Process-exit-wait")
    exitThread.start()

    subj.timeoutOnSlowUpstream(timeout)
      .guarantee(Task(finished.synchronized {
        if (!finished.finished) {
          finished.finished = true
          proc.destroy()
          exitThread.interrupt()
        }
      }))
  }
}

Пример использования будет выглядеть примерно так:

def test(): Unit = {
  import monix.execution.Ack._
  import monix.reactive._
  import scala.concurrent._
  import scala.concurrent.duration._
  import monix.execution.Scheduler.Implicits.global


  val linesO = ProcessHelper.buildProcessLinesObservable("python3 test.py", 5 seconds, 2) // buffer is reduced to just 2 lines just for this example 

  linesO.subscribe(new Observer[String] {
    override def onNext(s: String): Future[Ack] = {
      println(s"Received '$s'")
      Future.successful(Continue)
    }

    override def onError(ex: Throwable): Unit = println(s"Error '$ex'")

    override def onComplete(): Unit = println("Complete")
  })

  try {
    println(linesO.toListL.runSyncUnsafe())
    println(linesO.toListL.runSyncUnsafe()) // second run will show only last 2 values because of the reduced buffer size
    println("Finish success")
  }
  catch {
    case e: Throwable => println("Failed with " + e)
  }
}
...