Как прочитать ответ как Observable [String] с помощью STTP - PullRequest
0 голосов
/ 04 февраля 2019

Я использую sttp клиент.Я хочу интерпретировать ответ как строки, разделенные на строки, например Observable[String]

Здесь STI потокового API:

import java.nio.ByteBuffer

import com.softwaremill.sttp._
import com.softwaremill.sttp.okhttp.monix.OkHttpMonixBackend
import monix.eval.Task
import monix.reactive.Observable

implicit val sttpBackend = OkHttpMonixBackend()

val res: Task[Response[Observable[ByteBuffer]]] = sttp
  .post(uri"someUri")
  .response(asStream[Observable[ByteBuffer]])
  .send()

Так, как я могу получить Observable[String]?

Вот несколько идей:

1. Есть ли простой способ split Наблюдение по линиям?
2. Или, может бытьЯ могу получить необработанный InputStream из ответа, поэтому я могу легко разделить его, но я не могу найти способ использовать что-то вроде asStream[InputStream]
3. Или, может быть, просто использовать http-бэкэнд без использования sttp слой?

1 Ответ

0 голосов
/ 05 февраля 2019

Ваша основная проблема в том, как преобразовать Observable[ByteBuffer] в Observable[String], где каждая String - это строка, верно?

Вы можете использовать метод bufferWithSelector(selector: Observable[S]): Observable[Seq[A]].Этот метод будет буферизовать Observable, пока селектор Observable не выделит элемент.

Я сделал небольшой пример, используя Int s:

import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._

val source = Observable.range(0, 1000, 1)
  .delayOnNext(100.milliseconds)

val selector = source.filter(_ % 10 == 0)

val buffered = source.bufferWithSelector(selector)
  .map(_.foldLeft("")((s, i) => s + i.toString)) // This folds the Seq[Int] into a String for display purposes

buffered.foreach(println)

Попробуйте!


Конечно, это имеет существенный недостаток: базовая наблюдаемая source будет оценена дважды.Вы можете увидеть это, изменив приведенный выше пример:

// Start writing your ScalaFiddle code here

import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._

val source = Observable.range(0, 1000, 1)
  .delayOnNext(100.milliseconds)
  .map {x => println(x); x}  // <------------------

val selector = source.filter(_ % 10 == 0)

val buffered = source.bufferWithSelector(selector)
  .map(_.foldLeft("")((s, i) => s + i.toString))

buffered.foreach(println)

Это напечатает каждое число дважды.


Чтобы это исправить, вы должны преобразовать source Observable вhot Observable:

import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._

val source = Observable.range(0, 1000, 1)
  .delayOnNext(100.milliseconds)
  .map {x => println(x); x}
  .publish // <-----------------------------

// source is now a ConnectableObservable and will start emitting elements
// once you call source.connect()

val selector = source.filter(_ % 10 == 0)

val buffered = source.bufferWithSelector(selector)
  .map(_.foldLeft("")((s, i) => s + i.toString))

buffered.foreach(println)

source.connect() // <---------------------------

Попробуйте!

Единственное, что вам нужно сделать, - это изменить селектор так, чтобы он генерировал элементы только при появлении перевода строки..

Я бы предложил разбить Observable[ByteBuffer] на Observable[Byte] сначала (используя flatMap), чтобы избежать головной боли.

...