Задержка оператора в rxjava2 - PullRequest
0 голосов
/ 21 февраля 2019

Я новичок в rxjava2.Когда я прочитал об этом книгу, у меня есть кое-что, чего я не понимаю о задержке оператора.

Мы можем отложить выбросы с помощью оператора delay ().Он будет удерживать любые полученные выбросы и задерживать каждое из них на указанный период времени.Если бы мы хотели задержать выбросы на три секунды, мы могли бы сделать это следующим образом:

 public static void main(String[] args) {

    Observable.just("Alpha", "Beta", "Gamma" ,"Delta",
            "Epsilon")
            .delay(3000, TimeUnit.SECONDS)
            .subscribe(s -> System.out.println("Received: " + s));

    sleep(3000);
}

public static void sleep(long millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}

Вывод предыдущего фрагмента кода выглядит следующим образом: Beta Alpha Gamma Delta Epsilon

Я думаю, что вывод только «Альфа», потому что они сказали

Поскольку delay () работает на другом планировщике (таком как Observable.interval ()), нам нужно использовать sleep ()способ сохранить приложение достаточно долго, чтобы увидеть это.Каждое излучение будет задержано на три секунды

с задержкой 3 с, я думаю, что есть излучение как "Альфа", но оно излучает все излучения в наблюдаемых.

Ответы [ 3 ]

0 голосов
/ 21 февраля 2019

Может быть, это вы хотели?

val source1 = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

val source2 = Observable.interval(3000, TimeUnit.MILLISECONDS)

val observable = Observable.zip(source1, source2, object : BiFunction<String, Long, Any> {
    override fun apply(t1: String, t2: Long): Any {
        Log.d("Sometag", "Received $t1")
        return "Something"
    }
}).subscribe()

Выход

2019-02-21 13:40:15.502  D/Sometag: Received Alpha
2019-02-21 13:40:18.502  D/Sometag: Received Beta
2019-02-21 13:40:21.502  D/Sometag: Received Gamma
2019-02-21 13:40:24.502  D/Sometag: Received Delta
2019-02-21 13:40:27.502  D/Sometag: Received Epsilon
0 голосов
/ 21 февраля 2019

в вашем случае операция задержки будет только задерживать весь поток на 3 секунды, и все элементы будут немедленно отправлены, см. Больше в документации по rx http://reactivex.io/documentation/operators/delay.html

если вы хотите задержать каждый элемент на 3 секунды, вы можетесделать это так:

       Observable.fromArray("Alpha", "Beta", "Gamma", "Delta",
            "Epsilon")
            .concatMap(s -> Observable.just(s).delay(3, TimeUnit.SECONDS))
            .subscribe(s -> System.out.println("Received: " + s));
0 голосов
/ 21 февраля 2019

В соответствии с Документацией , оператор задержки «задерживает» выбросы в течение заданного времени.

Таким образом, он будет отображать «Альфа», «Бета», «Гамма», «Дельта»,«Эпсилон», а не только «Альфа».

println("started")

val subscribe = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .delay(3, TimeUnit.SECONDS)
        .subscribe { s -> println("Received: $s") }

этот код произведет все пять строк, выпущенных через 3 с.

  • 2019-02-21 18: 02: 30.285 I: начало
  • 2019-02-21 18: 02: 33.459 I: получено: альфа
  • 2019-02-21 18: 02: 33.464 I: получено: бета
  • 2019-02-21 18: 02: 33.466 I: Получено: Гамма
  • 2019-02-21 18: 02: 33.467 I: Получено: Дельта
  • 2019-02-21 18:02: 33.469 I: Получено: Epsilon
...