Как использовать Observable.ToAsync с IEnumerable - PullRequest
1 голос
/ 16 августа 2011

У меня возникла проблема с Rx.Мне нужно, чтобы после обработки каждого элемента в новом потоке результат отправлялся в основной поток.Я сделал это другим способом.Как я могу решить эту задачу с Rx?Вот код:

 Observable.ToAsync<string, IEnumerable<UserState>>(Run)(path)
 .ObserveOnDispatcher<IEnumerable<UserState>>()
 .Subscribe(
(o) =>
{   // need to run in Main Thread
    foreach (var item in o)
    {
     WriteLog(item.StatusCode, item.Url);
    }                        
},
(ex) =>{        },
() =>{      } );

   // need to run in New Thread
   private IEnumerable<UserState> Run(string sitemap)
   {
 ....
 foreach (var url in urls)
 {
    var result = new UserState
    {
        Url = url.Value,
        TotalItems = urls.Count
    };
    ....
    yield return result;
 }
   }

Ответы [ 2 ]

1 голос
/ 18 августа 2011

Вы хотите сгенерировать IEnumerable в каком-либо другом фоновом потоке, а затем обработать каждый пользовательский объект в этом перечисляемом в основном потоке пользовательского интерфейса. Если это понимание верно, вы можете сделать что-то вроде этого:

var users = Run(path); //NOTE: This doesn't execute your run method yet, Run will only execute when you start enumerating the users values
users.ToObservable(System.Concurrency.Scheduler.ThreadPool) //The enumerator will be scheduled on separate thread
.ObserveOn(frm) //Observe on UI thread of win form
.Subscribe(s => {}) //This will run in UI thread for each user object
1 голос
/ 17 августа 2011

Было бы хорошо, если бы вы описали возникшую проблему.

На данный момент я могу вам сказать, что у вас есть пара потенциальных проблем.

Во-первых, использование ObserveOnDispatcher предназначено для работы с System.Windows.Threading.Dispatcher (обычно создается с WPF). Если вы выполняете свой код вне WPF, это фактически означает «текущий поток», и это может привести к невозможности выполнения вашей подписки, если текущий поток занят. Другими словами, вы можете создать тупик.

Я запустил ваш код как в WPF, так и в LINQPad, и он хорошо работал в WPF, но заблокирован в LINQPad. Если я наблюдал за другим потоком, то он работал нормально в LINQPad и не работал в WPF.

Во-вторых, вы превращаете метод итератора в наблюдаемую асинхронность, и это не сработает, как вы ожидаете. Итератор на самом деле не запускает никакого кода, пока вы не выполните итерацию по перечисляемому. По сути, вы возвращаетесь из Run почти мгновенно и выполняете только тело вашего Run метода в коде Subscribe - и это не тот поток!

То, что вам нужно сделать, это принудительно выполнить перечисляемый - как минимум - изменить ваш код, чтобы он выглядел так:

private UserState[] Run(string sitemap)
{
    ...
    Func</* url type */, UserState> create = url =>
    {
        var result = new UserState
        {
            Url = url.Value,
            TotalItems = urls.Count
        };
        ....
        return result;
    };
    return (from url in urls select create(url)).ToArray();
}

Ваш основной код должен быть немного очищен:

Observable.ToAsync<string, UserState[]>(Run)(path)
    .ObserveOnDispatcher()
    .Subscribe(o =>
    {
        foreach (var item in o)
        {
            WriteLog(item.StatusCode, item.Url);
        }                        
    });

Дайте мне знать, если это поможет.


РЕДАКТИРОВАТЬ: Добавлен образец FromEventPattern код в соответствии с запросом OP в комментариях.

Вот пример использования Windows Forms FromEventPattern. Первая часть создает способ очистки подписок при закрытии формы.

public partial class Form1 : Form
{
    public Form1()
    {
        InitializeComponent();

        // Create a collection of IDisposable
        // to allow clean-up of subscriptions
        var subscriptions =
            new System.Reactive.Disposables.CompositeDisposable();

        var formClosings = Observable
            .FromEventPattern<FormClosingEventHandler, FormClosingEventArgs>(
                h => this.FormClosing += h,
                h => this.FormClosing -= h);

        // Add a subscription that cleans up subscriptions
        // when the form closes
        subscriptions.Add(
            formClosings
                .Subscribe(ea => subscriptions.Dispose()));

Эта следующая часть отслеживает перетаскивание мышью в графическом окне и создает сообщения, чтобы сообщить пользователю, как далеко они перетащили.

        var pictureBox1MouseDowns = Observable
            .FromEventPattern<MouseEventHandler, MouseEventArgs>(
                h => pictureBox1.MouseDown += h,
                h => pictureBox1.MouseDown -= h);

        var pictureBox1MouseMoves = Observable
            .FromEventPattern<MouseEventHandler, MouseEventArgs>(
                h => pictureBox1.MouseMove += h,
                h => pictureBox1.MouseMove -= h);

        var pictureBox1MouseUps = Observable
            .FromEventPattern<MouseEventHandler, MouseEventArgs>(
                h => pictureBox1.MouseUp += h,
                h => pictureBox1.MouseUp -= h);

        var pictureBox1MouseDrags =
            from md in pictureBox1MouseDowns
            from mm in pictureBox1MouseMoves.TakeUntil(pictureBox1MouseUps)
            let dx = mm.EventArgs.Location.X - md.EventArgs.Location.X
            let dy = mm.EventArgs.Location.Y - md.EventArgs.Location.Y
            select new Point(dx, dy);

        var pictureBox1MouseDragMessages =
            from md in pictureBox1MouseDrags
            let f = "You've dragged ({0}, {1}) from your starting point"
            select String.Format(f, md.X, md.Y);

Следующая часть отслеживает количество нажатий кнопки и создает сообщения для отображения пользователю.

        var button1ClickCount = 0;

        var button1Clicks = Observable
            .FromEventPattern(
                h => button1.Click += h,
                h => button1.Click -= h);

        var button1ClickCounts =
            from c in button1Clicks
            select ++button1ClickCount;

        var button1ClickMessages =
            from cc in button1ClickCounts
            let f = "You clicked the button {0} time{1}"
            select String.Format(f, cc, cc == 1 ? "" : "s");

Наконец, два списка сообщений объединяются и подписываются, помещая сообщение в метку.

        var messages = pictureBox1MouseDragMessages
            .Merge(button1ClickMessages);

        // Add a subscription to display the
        // merged messages in the label
        subscriptions.Add(
            messages
                .Subscribe(m => label1.Text = m));
    }
}

Имейте в виду, что все это находится в конструкторе формы и никакие поля или свойства уровня модуля не используются, и все обработчики событий удаляются при закрытии формы. Очень аккуратные вещи.

...