Ваша основная проблема в том, как преобразовать Observable[ByteBuffer]
в Observable[String]
, где каждая String
- это строка, верно?
Вы можете использовать метод bufferWithSelector(selector: Observable[S]): Observable[Seq[A]]
.Этот метод будет буферизовать Observable, пока селектор Observable не выделит элемент.
Я сделал небольшой пример, используя Int
s:
import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._
val source = Observable.range(0, 1000, 1)
.delayOnNext(100.milliseconds)
val selector = source.filter(_ % 10 == 0)
val buffered = source.bufferWithSelector(selector)
.map(_.foldLeft("")((s, i) => s + i.toString)) // This folds the Seq[Int] into a String for display purposes
buffered.foreach(println)
Попробуйте!
Конечно, это имеет существенный недостаток: базовая наблюдаемая source
будет оценена дважды.Вы можете увидеть это, изменив приведенный выше пример:
// Start writing your ScalaFiddle code here
import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._
val source = Observable.range(0, 1000, 1)
.delayOnNext(100.milliseconds)
.map {x => println(x); x} // <------------------
val selector = source.filter(_ % 10 == 0)
val buffered = source.bufferWithSelector(selector)
.map(_.foldLeft("")((s, i) => s + i.toString))
buffered.foreach(println)
Это напечатает каждое число дважды.
Чтобы это исправить, вы должны преобразовать source
Observable вhot Observable:
import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._
val source = Observable.range(0, 1000, 1)
.delayOnNext(100.milliseconds)
.map {x => println(x); x}
.publish // <-----------------------------
// source is now a ConnectableObservable and will start emitting elements
// once you call source.connect()
val selector = source.filter(_ % 10 == 0)
val buffered = source.bufferWithSelector(selector)
.map(_.foldLeft("")((s, i) => s + i.toString))
buffered.foreach(println)
source.connect() // <---------------------------
Попробуйте!
Единственное, что вам нужно сделать, - это изменить селектор так, чтобы он генерировал элементы только при появлении перевода строки..
Я бы предложил разбить Observable[ByteBuffer]
на Observable[Byte]
сначала (используя flatMap
), чтобы избежать головной боли.