Как определить ConcatMap в Rx.Net? - PullRequest
0 голосов
/ 21 июня 2019

Rx.Net на данный момент не имеет эквивалента concatMap, но наверняка есть способ получить аналогичное поведение, учитывая доступную функциональность.У меня сейчас observable.SelectMany(x => ProcessItemAsync(item).ToObservable()), где ProcessItemAsync - это асинхронный метод, который я хотел бы выполнить по порядку, а не для всех элементов сразу.

Если я правильно понимаю Rx, observable.ConcatMap(x => ProcessItemAsync(item).ToObservable()) должен сделать это, но в настоящее время его нет в Rx.Net, так что может быть еще одним способом достижения того же поведения?

У меня может быть несколько источников для наблюдаемых, и каждый из этих источников можетвыполнить ProcessItemAsync параллельно, это должно быть только в потоке, чтобы сохранить порядок ввода / вывода, поэтому я не могу заблокировать его на ProcessItemAsync

1 Ответ

0 голосов
/ 10 июля 2019

РЕДАКТИРОВАТЬ: нашел гораздо более простое решение здесь: Ответ на Reactive Extensions SelectMany и Concat

Вставьте ниже в LINQPad , чтобы попробовать

public void Main()
{
    // Generate 100 items and 'process' async
    Observable
        .Range(0, 100)
        .Select(x => ProcessItemAsync(x))
        .Concat()
        .Dump();
}

private readonly Random _rnd = new Random();

public async Task<int> ProcessItemAsync(int item)
{
    await Task.Delay(_rnd.Next(0, 1000));
    return item;
}

(В оригинальном посте я создал метод расширения, который выбирал / отображал данный асинхронный Func. Затем я создал класс, в котором результаты ожидались, индексировались и сохранялись для выдачи в порядке.)

...