Создание Monix, наблюдаемого из NuProcess ByteBuffer - PullRequest
4 голосов
/ 22 января 2020

Итак, у меня есть текущий проект, который использует Java Process, и я пытаюсь заменить его на NuProcess (т.е. https://github.com/brettwooldridge/NuProcess). Для работы с STDOUT / STDERROR Java Process у вас есть InputStream, а поскольку Monix предоставил удобный метод взаимодействия Observable.fromInputStream, это позволило вам легко создать Observable[String] / Observable[Array[Byte]].

Однако проблема в том, что NuProcess не работает с InputStream, вместо этого он использует javas NIO ByteBuffer. Кроме того, он немного сложнее, поскольку использует механизм обработки событий, т. Е. Для прослушивания STDOUT / STDERROR в задаче Monix для NuProcess вы должны сделать что-то вроде

nuProcess.setProcessHandler(new NuProcessHandler {
  override def onStderr(buffer: ByteBuffer, closed: Boolean) = {
    if (!closed) {
      val bytes = new Array[Byte](buffer.remaining())
      buffer.get(bytes)
      // Do something else here
    }
  }

  override def onStdout(buffer: ByteBuffer, closed: Boolean) = {
    if (!closed) {
      val bytes = new Array[Byte](buffer.remaining())
      buffer.get(bytes)
      // Do something else here
    }
  }
})

Так что вопрос в том, как бы вы подключили это к наблюдаемому (то есть Observable[String] или Observable[Array[Byte]], не будучи неэффективным?

Обратите внимание, что я использую Monix 3.x

...