Reactive Framework (RX) и работа с событиями асинхронно - PullRequest
3 голосов
/ 09 июля 2010

Так что я просто играю с RX и изучаю его. Я начал играть с событиями и хотел знать, как подписаться на события и обрабатывать результаты в пакетном режиме асинхронно. Позвольте мне объяснить с помощью кода:

Простой класс, который вызывает события:

public class EventRaisingClass
{
   public event EventHandler<SomeEventArgs> EventOccured;

   //some other code that raises event...
}

public class SomeEventArgs : EventArgs
{
    public SomeEventArgs(int data)
    {
        this.SomeArg = data;
    }

    public int SomeArg { get; private set; }
}

Тогда мой главный:

public static void Main(string[] args)
{
    var eventRaiser = new EventRaisingClass();
    IObservable<IEvent<SomeEventArgs>> observable = 
        Observable.FromEvent<SomeEventArgs>(e => eventRaiser.EventOccured += e, e => eventRaiser.EventOccured -= e);

    IObservable<IList<IEvent<SomeEventArgs>>> bufferedEvents = observable.BufferWithCount(100);

    //how can I subscribte to bufferedEvents so that the subscription code gets called Async?
    bufferedEvents.Subscribe(list => /*do something with list of event args*/); //this happens synchrounously...

}

Как вы можете видеть в моих комментариях, когда вы просто так звоните подписаться, весь код подписки происходит синхронно. Есть ли выход из коробки с использованием RX, чтобы подписка вызывалась в разных потоках всякий раз, когда есть новый пакет событий для работы?

Ответы [ 2 ]

2 голосов
/ 09 июля 2010
bufferedEvents.ObserveOn(Scheduler.TaskPool).Subscribe(...

SubscribeOn - указать расписание, по которому происходят так называемые « побочные эффекты подписки ».Например, ваша наблюдаемая может открывать файл каждый раз, когда кто-то подписывается.

ObserveOn должен указывать расписание, по которому вызов наблюдателя будет происходить каждый раз, когда появляется новое значение.На практике он используется чаще, чем SubscribeOn.

2 голосов
/ 09 июля 2010

Я полагаю, вы ищете SubscribeOn или ObserveOn, передавая IScheduler. Есть несколько встроенных планировщиков под System.Concurrency; некоторые из них используют любой текущий поток, а другие используют определенные потоки.

Это видео содержит больше информации о концепции планировщика.

Команда Rx также недавно выпустила документ по практическим занятиям , который сейчас ближе всего к учебнику.

...