Как подавить следующее событие потока A всякий раз, когда запускается поток B - PullRequest
1 голос
/ 08 мая 2011

Я хочу остановить поток A ровно для одного уведомления при каждом запуске потока B. Оба потока останутся в сети и никогда не завершатся.

A: o--o--o--o--o--o--o--o--o  
B: --o-----o--------o-------  
R: o-----o-----o--o-----o--o  

или

A: o--o--o--o--o--o--o--o--o  
B: -oo----oo-------oo-------  
R: o-----o-----o--o-----o--o  

Ответы [ 2 ]

2 голосов
/ 09 мая 2011

Вот версия моего SkipWhen оператора, который я сделал для аналогичный вопрос (разница в том, что в оригинале несколько «B» пропустили бы несколько «A»):

public static IObservable<TSource> SkipWhen<TSource, TOther>(this IObservable<TSource> source, 
    IObservable<TOther> other)
{
    return Observable.Create<TSource>(observer =>
    {
        object lockObject = new object();
        bool shouldSkip = false;

        var otherSubscription = new MutableDisposable();
        var sourceSubscription = new MutableDisposable();

        otherSubscription.Disposable = other.Subscribe(
            x => { lock(lockObject) { shouldSkip = true; } });

        sourceSubscription.Disposable = source.Where(_ =>
        {
            lock(lockObject)
            {
                if (shouldSkip)
                {
                    shouldSkip = false;
                    return false;
                }
                else
                {
                    return true;
                }
            }
        }).Subscribe(observer);

        return new CompositeDisposable(
            sourceSubscription, otherSubscription);
    });
}

Если текущая реализация становится узким местом, рассмотрите возможность изменения реализации блокировки для использования ReaderWriterLockSlim.

2 голосов
/ 08 мая 2011

Это решение будет работать, когда наблюдаемая горячая (и без refCount):

streamA
    .takeUntil(streamB)
    .skip(1)
    .repeat()
    .merge(streamA.take(1))
    .subscribe(console.log);
  1. .takeUntil(streamB): сделать поток A завершенным после потока B, получая значение.
  2. .skip(1): сделать поток A пропустить одно значение при запуске (или в результате .repeat()).
  3. .repeat(): сделать поток A повтор (переподключение) неопределенно.
  4. .merge(streamA.take(1)): смещение эффекта .skip(1) в начале потока.

Пример создания пропуска потока каждые 5 секунд:

var streamA,
    streamB;

streamA = Rx.Observable
    .interval(1000)
    .map(function (x) {
        return 'A:' + x;
}).publish();

streamB = Rx.Observable
    .interval(5000);

streamA
    .takeUntil(streamB)
    .skip(1)
    .repeat()
    .merge(streamA.take(1))
    .subscribe(console.log);

streamA.connect();

Вы также можете использовать эту изолированную программную среду http://jsbin.com/gijorid/4/edit?js,console для выполнения BACTION() в журнале консоли во время выполнения кода для ручного перевода значения в streamB (что полезно для анализа кода).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...