Я хочу представить выполнение внешнего процесса как Observable[String]
, где String
- строка из вывода процесса. Вот пример того, что я делаю, это сработало:
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
object TestSo {
def main(args: Array[String]): Unit = {
val lineStream = scala.sys.process.Process("python3 test.py").lineStream
val lineStreamO: Observable[String] = Observable.fromIterator(Task(lineStream.iterator))
.doOnNext(l => Task(println(l))) //logging
.guarantee(Task(println("clean resources")))
println(lineStreamO.toListL.runSyncUnsafe())
}
}
Вы можете видеть, что процесс выдает новую строку каждую секунду. Но это неважно. Просто приведите полный пример, test.py
:
from time import sleep
print(0, flush=True)
sleep(1)
print(1, flush=True)
sleep(1)
print(2, flush=True)
sleep(1)
print(3, flush=True)
sleep(1)
print(4, flush=True)
Выход:
0
1
2
3
4
5
clean resources
List(0, 1, 2, 3, 4, 5)
Задача :
Я хочу установить тайм-аут - если процесс зависает (например, sleep 100000
), процесс должен быть остановлен после тайм-аута. Также, если процесс завершен или завершился неудачей, некоторые ресурсы должны быть очищены (guarantee
в примере). Нулевой код выхода должен представлять ошибку.
Как реализовать выполнение процесса как Observable[String]
с обработкой ошибок propper? rx-java
решения приветствуются.