лучший способ разобраться в RX? - PullRequest
2 голосов
/ 24 февраля 2012

Я использую rx для наблюдения за событием. Существует только один подписчик с длительной операцией.

Событие может инициироваться снова, когда предыдущая операция не завершена. как отбросить это событие?

Вот что я ищу: что-то вроде метода SubscribeWithDebounceAsync:

var observable = Observable.FromEventPattern<T>(obj, "OnSomeEvent");
observable.SubscribeWithDebounceAsync( ep => .... );

Ответы [ 2 ]

0 голосов
/ 01 марта 2012

Вот другая реализация:

public static class Extensions
{
    public static IDisposable SubscribeWithDebounceAsync<T>(
      this IObservable<T> source, 
      Action<T> longRunningTask)
    {
        var finished = new Subject<Unit>();
        var debounced = source
          .SkipUntil(finished)
          .Take(1)
          .Repeat()
          .Subscribe(
              t => Observable.Start(() => 
                {
                  longRunningTask(t);
                  finished.OnNext(Unit.Default);
                }));
        finished.OnNext(Unit.Default);
        return debounced;
    }
}

Последний завершен. Для запуска процесса есть следующий, в противном случае он останется навсегда на SkipUntil (завершено).

РЕДАКТИРОВАТЬ: сделал код более кратким.

0 голосов
/ 25 февраля 2012

Ваш вопрос звучит так, как будто вы хотите выполнить эту операцию каждый раз, когда происходит событие, и вы в данный момент не выполняете операцию. Ваши комментарии предполагают, что вы хотите выполнить операцию только один раз.

Последнее проще:

var observable = Observable.FromEventPattern<T>(...);
var disp = observable.Take(1).Subscribe(ep => ...);

В первом случае я предполагаю, что вы намереваетесь выполнить эту длительную операцию в другом потоке. Если вы этого не сделаете, вы задержите поток, о котором Observable вызывает сообщения. SerialDisposable твой друг здесь:

<Extension()>
Public Function IgnoreWhileExecuting(Of T)(source As IObservable(Of T),
                                           onNext As Func(Of T, Task),
                                           onError As Action(Of Exception),
                                           onCompleted As Action
                                          ) As IDisposable
    'argument validation/error handling skipped for sample
    Dim serial As New SerialDisposable

    Dim syncNext As Action(Of T) = Nothing
    'this function will be called for the initial and subsequent subscriptions
    Dim subscribe As Func(Of IDisposable) =
        Function()
            Return source.Subscribe(syncNext,
                                    onError,
                                    onCompleted)
        End Function
    'this function will be "suspend" the subscription while executing
    'the long-running operation, but it must return immediately to 
    syncNext = Sub(v)
                   'stop the current subscription to the source
                   serial.Disposable.Dispose()
                   'perform the long running operation and follow
                   'with resubscription.  This will resubscribe regardless
                   'of if the task completes successfully or not
                   onNext(v).ContinueWith(Sub() serial.Disposable = subscribe())
               End Sub
    serial.Disposable = subscribe()
    Return serial
End Function

Поскольку это будет подписка и отмена подписки, обычно предполагается, что у вас есть горячая наблюдаемая, например, наблюдаемая событие, которое вы используете в своем вопросе. Вы можете проверить это в консольном приложении.

Sub Main()
    Dim left = Observable.Interval(TimeSpan.FromMilliseconds(500))
    Dim leftHot = left.Do(Sub(v) WriteTimestamped("Tick {0}", v)).Publish()
    Dim f As New TaskFactory
    Dim disp = leftHot _
                   .IgnoreWhileExecuting(Function(v)
                                             Return f.StartNew(Sub(tparam)
                                                                   WriteTimestamped("Before sleep {0}", tparam)
                                                                   Thread.Sleep(2000)
                                                                   WriteTimestamped("After sleep {0}", tparam)
                                                               End Sub,
                                                               v)
                                         End Function,
                                         Sub(ex) Console.WriteLine("Error in ignore: " & ex.ToString()),
                                         Sub() Console.WriteLine("Completed from ignore"))
    Dim con = leftHot.Connect()
    Console.ReadKey()
    disp.Dispose()
    Console.ReadKey()
    con.Dispose()
    Console.ReadKey()
End Sub

Private Sub WriteTimestamped(ByVal format As String, ByVal arg As Object)
    Console.WriteLine(Date.Now.ToString("HH:mm:ss.f") & " " & String.Format(format, arg))
End Sub

Чтобы увидеть разницу между горячим и холодным, удалите вызов Publish и соответствующий Connect и избавьтесь от соединения и снова запустите образец.

...