Как заблокировать ToObservable с помощью List <>? - PullRequest
3 голосов
/ 20 февраля 2011

Я впервые пробую RX, и у меня есть пара вопросов.

1) Есть ли лучший способ выполнить Async моей коллекции?

2) Мне нужно заблокировать поток, пока все асинхронные задачи не будут выполнены, как мне это сделать?

class Program
{

    internal class MyClass
    {
        private readonly List<int> _myData = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };

        private readonly Random random = new Random();

        public int DoSomething(int j)
        {
            int i = random.Next(j * 1000) - (j * 200);
            i = i < 0 ? 1000 : i;
            Thread.Sleep(i);
            Console.WriteLine(j);
            return j;
        }

        public IObservable<int> DoSomethingAsync(int j)
        {
            return Observable.CreateWithDisposable<int>(
                o => Observable.ToAsync<int, int>(DoSomething)(j).Subscribe(o)
                );
        }

        public void CreateTasks()
        {
            _myData.ToObservable(Scheduler.NewThread).Subscribe(
            onNext: (i) => DoSomethingAsync(i).Subscribe(),
            onCompleted: () => Console.WriteLine("Completed")
                );
        }
    }

    static void Main(string[] args)
    {
        MyClass test = new MyClass();

        test.CreateTasks();

        Console.ReadKey(); 
    } 
}

(Примечание: я знаю, что мог бы использовать Observable.Range для своего списка Int, но в реальной программе мой список не имеет тип Int).

1 Ответ

3 голосов
/ 20 февраля 2011

Я бы, наверное, попробовал

public void CreateTasks()                
{                       
    _myData.ToObservable(Scheduler.NewThread)
        .SelectMany(i => Observable.Start(() => DoSomething(i)))
        .Subscribe(j => Console.WriteLine("j is {0}", j), 
                  () => Console.WriteLine("Completed"));       
} 

Итак, во-первых, я изменил DoSomethingAsync, чтобы он использовал Observable.Start.Observable.Start запустит метод DoSomething асинхронно и вернет значение через IObservable.OnNext после завершения метода.

Затем метод CreateTasks запускает каждый элемент в коллекции, как и прежде, но передает каждое значение в SelectMany, который продолжается вызовом метода DoSomethingAsync.В результате вы получите OnNext за каждый завершенный вызов DoSomethingAsync и OnComplete, когда все они будут завершены.

...