Как запустить метод finally (), когда все наблюдаемые асинхронные элементы завершают обработку? - PullRequest
1 голос
/ 28 октября 2019

Для того чтобы пользовательский интерфейс был более отзывчивым, я выводил элементы партиями. Проблема здесь в том, что OnFinally() вызывается до того, как завершится последний OutputItems().

    IObservable<IList<xx>> obs = Observable
     .Interval(TimeSpan.FromSeconds(.1), Scheduler.Default)
       .Zip(dirEnum.ToObservable(NewThreadScheduler.Default)
         .Buffer(100), (a, b) => b)
     .ObserveOn(syncContext).Finally(OnFinally);
    ...
    obs.Subscribe(async x => await OutputItems(x));

Есть ли способ вызвать OnFinally, когда все элементы отсутствуют?

Ответы [ 2 ]

1 голос
/ 29 октября 2019

Проецируйте буферы непосредственно на ваши OutPutItems и не используйте await внутри подписки

IObservable<IList<xx>> obs = Observable
     .Interval(TimeSpan.FromSeconds(.1), Scheduler.Default)
       .Zip(dirEnum.ToObservable(NewThreadScheduler.Default)
         .Buffer(100), (a, b) => b)
     .ObserveOn(syncContext).Finally(OnFinally)
     .SelectManay(X=>OutputItmes().ToObservable())
     .Finally(OnFinally);
    ...
    obs.Subscribe();
0 голосов
/ 29 октября 2019

Метод Subscribe не принимает асинхронные делегаты, поэтому ваша лямбда является асинхронной недействительной. Асинхронные пустые методы не наблюдаемы или недоступны, а их исключения не поддаются отслеживанию (они не обрабатываются и приводят к краху процесса). Решение состоит в том, чтобы проецировать ваши партии на Task с партиями, а затем обратно на партии с помощью метода Merge. Когда вы Subscribe, вы получите результаты заданий в том порядке, в котором они были выполнены (скорее всего, не в их первоначальном порядке). Если вас не интересуют результаты, просто позвоните Subscribe без аргументов.

IObservable<IList<xx>> obs = Observable
    .Interval(TimeSpan.FromSeconds(.1), Scheduler.Default)
    .Zip(dirEnum.ToObservable(NewThreadScheduler.Default)
    .Buffer(100), (a, b) => b)
    .Select(x => OutputItemsAsync(x))
    .Merge()
    .ObserveOn(syncContext)
    .Finally(OnFinally);

obs.Subscribe();
...