Ваш вопрос звучит так, как будто вы хотите выполнить эту операцию каждый раз, когда происходит событие, и вы в данный момент не выполняете операцию. Ваши комментарии предполагают, что вы хотите выполнить операцию только один раз.
Последнее проще:
var observable = Observable.FromEventPattern<T>(...);
var disp = observable.Take(1).Subscribe(ep => ...);
В первом случае я предполагаю, что вы намереваетесь выполнить эту длительную операцию в другом потоке. Если вы этого не сделаете, вы задержите поток, о котором Observable вызывает сообщения. SerialDisposable
твой друг здесь:
<Extension()>
Public Function IgnoreWhileExecuting(Of T)(source As IObservable(Of T),
onNext As Func(Of T, Task),
onError As Action(Of Exception),
onCompleted As Action
) As IDisposable
'argument validation/error handling skipped for sample
Dim serial As New SerialDisposable
Dim syncNext As Action(Of T) = Nothing
'this function will be called for the initial and subsequent subscriptions
Dim subscribe As Func(Of IDisposable) =
Function()
Return source.Subscribe(syncNext,
onError,
onCompleted)
End Function
'this function will be "suspend" the subscription while executing
'the long-running operation, but it must return immediately to
syncNext = Sub(v)
'stop the current subscription to the source
serial.Disposable.Dispose()
'perform the long running operation and follow
'with resubscription. This will resubscribe regardless
'of if the task completes successfully or not
onNext(v).ContinueWith(Sub() serial.Disposable = subscribe())
End Sub
serial.Disposable = subscribe()
Return serial
End Function
Поскольку это будет подписка и отмена подписки, обычно предполагается, что у вас есть горячая наблюдаемая, например, наблюдаемая событие, которое вы используете в своем вопросе. Вы можете проверить это в консольном приложении.
Sub Main()
Dim left = Observable.Interval(TimeSpan.FromMilliseconds(500))
Dim leftHot = left.Do(Sub(v) WriteTimestamped("Tick {0}", v)).Publish()
Dim f As New TaskFactory
Dim disp = leftHot _
.IgnoreWhileExecuting(Function(v)
Return f.StartNew(Sub(tparam)
WriteTimestamped("Before sleep {0}", tparam)
Thread.Sleep(2000)
WriteTimestamped("After sleep {0}", tparam)
End Sub,
v)
End Function,
Sub(ex) Console.WriteLine("Error in ignore: " & ex.ToString()),
Sub() Console.WriteLine("Completed from ignore"))
Dim con = leftHot.Connect()
Console.ReadKey()
disp.Dispose()
Console.ReadKey()
con.Dispose()
Console.ReadKey()
End Sub
Private Sub WriteTimestamped(ByVal format As String, ByVal arg As Object)
Console.WriteLine(Date.Now.ToString("HH:mm:ss.f") & " " & String.Format(format, arg))
End Sub
Чтобы увидеть разницу между горячим и холодным, удалите вызов Publish
и соответствующий Connect
и избавьтесь от соединения и снова запустите образец.