RxJava Альтернатива CompletableFuture - PullRequest
0 голосов
/ 08 июня 2018

Я новичок в Rx Java и пытаюсь найти решение моей проблемы.Я хочу вернуть Single для вызывающей стороны, но данные недоступны при вызове этого метода, они будут заполнены либо до того, как будет выполнен этот вызов, либо после того, как этот вызов сделан.Наивный пример того, что я пытаюсь сделать, приведен ниже.CompletableFuture частично решает его, но я ищу решение Rx, возможно, с противодавлением.

val receiver = Receiver()

class Receiver {
    var data = ""

    // this returns a single but does not have data, but will be available after call to onComplete
    request(): Single<Data> {
        return Single.fromFuture(completableFuture)
    }

    onNext(data: String) {
        data.append(data)
    }

    onComplete() {
        completableFuture.complete(data)
    }
}

class Processor {
    fun process() {
        receiver.onNext("1")
        receiver.onNext("2")
        receiver.onComplete()
    }
}

class Caller {
    fun call() {
        // This should get "12" result
        // Processor().process() can be called before or after caller subscibe
        receiver.request()..subscribe(...)
    }
}

1 Ответ

0 голосов
/ 08 июня 2018

Вместо CompletableFuture вы можете использовать PublishSubject.Чтобы иметь возможность предоставить значение до или после вызова подписки, вы также должны использовать .cache() на Single.Сложите все это так:

class Receiver {
    var data = ""
    val subject = PublishSubject.create<Data>()
    val single = Single.fromObservable(subject).cache()
    // this ensures there is at least one subscription when the event is published - otherwise because there is no subscriber, the event is lost, despite cache().
    val dummyObserver = single.subscribe()

    // this returns a single but does not have data, but will be available after call to onComplete
    fun request(): Single<Data> {
        return single
    }

    fun onNext(data: String) {
        // as before
    }

    fun onComplete() {
        subject.onNext(data)
        subject.onComplete(data)
    }
}
...