Как отменить наблюдаемую последовательность - PullRequest
5 голосов
/ 20 июля 2011

У меня есть очень простой IObservable<int>, который действует как генератор импульсов каждые 500 мс:

var pulses = Observable.GenerateWithTime(0, i => true, i => i + 1, i => i,
                                         i => TimeSpan.FromMilliseconds(500))

И у меня есть CancellationTokenSource (который используется для отмены другой работы, которая выполняется одновременно).

Как я могу использовать источник токена отмены для отмены моей наблюдаемой последовательности?

Ответы [ 4 ]

7 голосов
/ 03 сентября 2014

Это старый поток, но только для справки в будущем, здесь есть более простой способ сделать это.

Если у вас есть CancellationToken, вы, вероятно, уже работаете с задачами.Итак, просто преобразуйте его в задачу и позвольте каркасу выполнить привязку:

using System.Reactive.Threading.Tasks;
...
var task = myObservable.ToTask(cancellationToken);

Это создаст внутреннего подписчика, который будет удален при отмене задачи.Это сделает свое дело в большинстве случаев, потому что большинство наблюдаемых производят значения только при наличии подписчиков.

Теперь, если у вас есть фактическая наблюдаемая, которую нужно по какой-то причине устранить (возможно, горячая наблюдаемая, которая больше не важна, если родительская задача отменена), это можно сделать с помощью продолжения:

disposableObservable.ToTask(cancellationToken).ContinueWith(t => {
    if (t.Status == TaskStatus.Canceled)
        disposableObservable.Dispose();
});
6 голосов
/ 21 июля 2011

Если вы используете GenerateWithTime (теперь заменен на Generate, передавая перегрузку func с временным интервалом), вы можете заменить второй параметр, чтобы определить состояние токена отмены следующим образом:

var pulses = Observable.Generate(0,
    i => !ts.IsCancellationRequested,
    i => i + 1,
    i => i,
    i => TimeSpan.FromMilliseconds(500));

В качестве альтернативы, если ваше событие, которое вызывает установку токена отмены, может быть преобразовано в саму наблюдаемую, вы можете использовать что-то вроде следующего:

pulses.TakeUntil(CancelRequested);

Более подробное объяснение я также разместил на http://www.thinqlinq.com/Post.aspx/Title/Cancelling-a-Reactive-Extensions-Observable.

2 голосов
/ 20 июля 2011

Вы можете связать подписку IObservable с CancellationTokenSource следующим фрагментом

var pulses = Observable.GenerateWithTime(0,
    i => true, i => i + 1, i => i,
    i => TimeSpan.FromMilliseconds(500));

// Get your CancellationTokenSource
CancellationTokenSource ts = ...

// Subscribe
ts.Token.Register(pulses.Subscribe(...).Dispose);
0 голосов
/ 20 июля 2011

Вы получаете IDisposable экземпляр от подписки.Позвоните Dispose() по этому вопросу.

...