PublishSubject с котлин сопрограммы (поток) - PullRequest
3 голосов
/ 13 мая 2019

Я пытаюсь реорганизовать кусок кода из RX в сопрограммы, но, несмотря на все мои усилия, я думаю, что теряюсь.

Итак, я создал PublishSubject, и я отправлял на него сообщения, а также я ждал результатов. Это работало безупречно, но теперь я не уверен, как сделать то же самое с сопрограммами (потоками или каналами).

private val subject = PublishProcessor.create<Boolean>>()

...

fun someMethod(b: Boolean) {
    subject.onNext(b)
}

fun observe() {
    subject.debounce(500, TimeUnit.MILLISECONDS)
           .subscribe {
         // value received
    }
}

Так как мне нужен оператор debounce, я действительно хотел сделать то же самое с потоками, поэтому я создал канал, а затем попытался создать поток из этого канала и прослушать изменения, но я не получаю никаких результатов.

private val channel = Channel<Boolean>()

...

fun someMethod(b: Boolean) {
    channel.send(b)
}

fun observe() {
    flow {
         channel.consumeEach { value ->
            emit(value)
         }
    }.debounce(500, TimeUnit.MILLISECONDS)
    .onEach {
        // value received
    }
}

Что я не прав?

1 Ответ

2 голосов
/ 13 мая 2019

Flow - это холодный асинхронный поток, такой же как Obserable.

Все преобразования в потоке, такие как map и filter doне запускать сбор или выполнение потока, только операторы терминала (например, single) запускают его.

Метод onEach - это просто преобразование.Поэтому вы должны заменить его оператором потока терминала collect.Также вы можете использовать BroadcastChannel, чтобы иметь более чистый код:

private val channel = BroadcastChannel<Boolean>(1)

fun someMethod(b: Boolean) {
    channel.send(b)
}

fun observe() {
  channel
    .asFlow()
    .debounce(500, TimeUnit.MILLISECONDS)
    .collect {
        // value received
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...