Добавление наблюдаемой последовательности после подписки - PullRequest
7 голосов
/ 10 ноября 2011

Мы используем Rx для мониторинга активности в нашем приложении silverlight, чтобы мы могли отображать сообщение пользователю после определенного периода бездействия.

Мы превращаем события (движения мыши и т. Д.) В наблюдаемые, а затем объединяем наблюдаемые вместе, чтобы создать одну (allActivity) наблюдаемую.Затем мы ограничиваем наблюдаемую allActivity, используя промежуток времени, и что-то подписывается на уведомление, когда система неактивна в течение определенного периода времени.

Как я могу добавить новую наблюдаемую / последовательность к ней после подписки (чтобы подписка подобрала это без отписки и повторной подписки).

например, объединить несколько последовательностей, газ, подписаться.Теперь добавьте дополнительную последовательность к наблюдаемой, на которую подписаны.

Пример кода:

private IObservable<DateTime> allActivity;
public void CreateActivityObservables(UIElement uiElement)
{
    // Create IObservables of event types we are interested in and project them as DateTimes
    // These are our observables sequences that can push data to subscribers/ observers 
    // NB: These are like IQueryables in the sense that they do not iterate over the sequence just provide an IObservable type
    var mouseMoveActivity = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(h => uiElement.MouseMove += h, h => uiElement.MouseMove -= h)
                                      .Select(o => DateTime.Now);

    var mouseLeftButtonActivity = Observable.FromEventPattern<MouseButtonEventHandler, MouseButtonEventArgs>(h => uiElement.MouseLeftButtonDown += h, h => uiElement.MouseLeftButtonDown -= h)
                                            .Select(o => DateTime.Now);

    var mouseRightButtonActivity = Observable.FromEventPattern<MouseButtonEventHandler, MouseButtonEventArgs>(h => uiElement.MouseRightButtonDown += h, h => uiElement.MouseRightButtonDown -= h)
                                             .Select(o => DateTime.Now);

    var mouseWheelActivity = Observable.FromEventPattern<MouseWheelEventHandler, MouseWheelEventArgs>(h => uiElement.MouseWheel += h, h => uiElement.MouseWheel -= h)
                                       .Select(o => DateTime.Now);

    var keyboardActivity = Observable.FromEventPattern<KeyEventHandler, KeyEventArgs>(h => uiElement.KeyDown += h, h => uiElement.KeyDown -= h)
                                     .Select(o => DateTime.Now);

    var streetViewContainer = HtmlPage.Document.GetElementById("streetViewContainer");
        var mouseMoveHandler = new EventHandler<HtmlEventArgs>(this.Moo);
        bool b = streetViewContainer.AttachEvent("mousemove", mouseMoveHandler);

    var browserActivity = Observable.FromEventPattern<Landmark.QDesk.ApplicationServices.IdleTimeoutService.MouseMoveHandler, HtmlEventArgs>(h => this.MyMouseMove += h, h => this.MyMouseMove -= h).Select(o => DateTime.Now);

    // Merge the IObservables<DateTime> together into one stream/ sequence
    this.allActivity = mouseMoveActivity.Merge(mouseLeftButtonActivity)
                                        .Merge(mouseRightButtonActivity)
                                        .Merge(mouseWheelActivity)
                                        .Merge(keyboardActivity)
                                        .Merge(browserActivity);
}

public IDisposable Subscribe(TimeSpan timeSpan, Action<DateTime> timeoutAction)
{
    IObservable<DateTime> timeoutNotification = this.allActivity.Merge   (IdleTimeoutService.GetDateTimeNowObservable())
                                                                .Throttle(timeSpan)
                                                                    .ObserveOn(Scheduler.ThreadPool);

    return timeoutNotification.Subscribe(timeoutAction);
}

Ответы [ 2 ]

17 голосов
/ 11 ноября 2011

Существует перегрузка для слияния, которая принимает IObservable,Сделайте внешнюю последовательность Субъектоми вызвать OnNext, когда вы хотите добавить другой источник в группу.Оператор слияния получит источник и подпишется на него:

var xss = new Subject<IObservable<int>>();
xss.Merge().Subscribe(x => Console.WriteLine(x));

xss.OnNext(Observable.Interval(TimeSpan.FromSeconds(1.0)).Select(x => 23 + 8 * (int)x));
xss.OnNext(Observable.Interval(TimeSpan.FromSeconds(0.8)).Select(x => 17 + 3 * (int)x));
xss.OnNext(Observable.Interval(TimeSpan.FromSeconds(1.3)).Select(x => 31 + 2 * (int)x));
...
5 голосов
/ 11 ноября 2011

Самый простой способ сделать это - использовать промежуточный предмет вместо вызовов Merge.

Subject<DateTime> allActivities = new Subject<DateTime>();
var activitySubscriptions = new CompositeDisposable();

activitySubscriptions.Add(mouseMoveActivity.Subscribe(allActivities));
activitySubscriptions.Add(mouseLeftButtonActivity.Subscribe(allActivities));
//etc ...

//subscribe to activities
allActivities.Throttle(timeSpan)
             .Subscribe(timeoutAction);

//later add another
activitySubscriptions.Add(newActivity.Subscribe(allActivities));

Класс Subject прекратит передавать события OnNext (и далее OnError и OnCompleted) от любой из наблюдаемых, на которые он подписан, если он получит какие-либо OnError или OnCompleted.

Основное различие между этим подходом и вашим образцом заключается в том, что он подписывается на все события при создании субъекта, а не при подписке на объединенную наблюдаемую.Поскольку все наблюдаемые в вашем примере горячие, разница не должна быть заметной.

...