Rx StartWith в методе asyn c без применения начального значения - PullRequest
1 голос
/ 21 февраля 2020

Я пытаюсь ввести начальное значение в поток RX с помощью метода StartWith:

public async Task<IObservable<Price>) Stream(Instrument instrumentDetails)
{
    var initialPrice = await _svc.GetSomeInitialPrice();

    var stream = _priceObserver.Stream
        .Where(o => o.Symbol == instrumentDetails.Symbol)
        .Select(o => GetPrice(o, instrumentDetails));

    stream.StartWith(initialPrice);

    return stream;
}

, однако метод asyn c из-за вызова для получения начального значения. и все равно он должен быть асин c на всем протяжении этого стека вызовов

Я считаю, что значение никогда не добавляется в начало. Я просто получаю остаток потока

, если я await метод StartWith, он никогда не возвращает

каких-либо идей, что я делаю неправильно

Ответы [ 2 ]

2 голосов
/ 21 февраля 2020

Методы IObservable не изменяют базовый объект - они возвращают новый. stream.StartWith(initialPrice) возвращает новую наблюдаемую, которую вы игнорируете, она ничего не делает для stream.

Вы должны написать это так:

stream = stream.StartWith(initialPrice);

Или:

var stream = _priceObserver.Stream
    .Where(o => o.Symbol == instrumentDetails.Symbol)
    .Select(o => GetPrice(o, instrumentDetails))
    .StartWith(initialPrice);

Примечание: если вы ожидаете наблюдаемое, оно будет ждать, пока наблюдаемое не будет завершено, т.е. когда оно выдает все свои значения и вызывает свой метод OnComplete. Обычно вы должны ждать наблюдаемого, которое, как вы знаете, будет выдавать только 1 значение, а затем завершать (например, запрос к удаленному серверу), потому что его ожидание вернет только последнее переданное значение. Поэтому, если ожидается, что ваш stream будет непрерывно выдавать значения, нет смысла его ждать.

0 голосов
/ 24 февраля 2020

Вам лучше избегать смешивания Task<> и Observable<>, как вы делаете. Если вы можете, просто наберите IObservable<>.

В вашем случае это довольно просто.

Просто попробуйте это:

public IObservable<Price> Stream(Instrument instrumentDetails)
    =>
        Observable
            .FromAsync(() => _svc.GetSomeInitialPrice())
            .SelectMany(x =>
                _priceObserver
                    .Stream
                    .Where(o => o.Symbol == instrumentDetails.Symbol)
                    .Select(o => GetPrice(o, instrumentDetails))
                    .StartWith(x));

Я проверил это с Basi c немного кода, и он работает просто отлично. Это также гарантирует, что новые подписчики будут иметь любые новые значения, которые _svc.GetSomeInitialPrice() может произвести со временем.

...