Использование Rx для регулирования обратных вызовов от не асинхронных вызовов - PullRequest
2 голосов
/ 10 мая 2011

Мой вопрос похож на этот вопрос, но, как всегда, немного отличается.

В настоящее время я работаю над проектом, в котором используется простой шаблон для асинхронного получения уведомлений об изменениях базовых данных.

Class Foo отвечает за подписку на эти изменения данных и предоставляет метод для классов, чтобы зарегистрировать свою заинтересованность в этих изменениях путем регистрации экземпляра класса, который реализует данный интерфейс:

public Guid SubscribeToChanges(ISomeCallbackInterface callback)

Class Bar реализует этот обратный вызов и регистрирует себя:

public class Bar : ISomeCallbackInterface
{
    ...
    //initialise instance of Foo class and subscribe to updates
    _foo.SubscribeToChanges(this);
    ...

    public void CallbackMethod(string id, IEnumerable<Tuple<string, int, object>> data)
    {
        ...
    }
}

В идеале мы хотели бы уменьшить эти обратные вызовы, поскольку мы можем, например, получить обратный вызов, который изменяет значение определенного фрагмента данных с x на yа затем обратно к х в течение секунды.Глядя на документацию Rx, это было бы тривиально с использованием оператора DistinctUntilChanged.

Однако вопрос заключается в том, как создать коллекцию IObservable, которую я затем могу применить к оператору, указанному выше в моей реализации обратного вызова.Документы очень четко рассказывают о том, как создать IObservable из стандартных событий .Net или пар методов Begin ... / End ....

ОБНОВЛЕНИЕ: Как отметил Ричард Сзалай в своем комментарии, мне нужно будетиспользуйте DistinctUntilChanged в тандеме с Throttle, чтобы добиться того, что мне нужно.

Опять же, благодаря Ричарду, я также должен был упомянуть, что мне нужна возможность отписаться от обратных вызовов.В текущей модели я просто вызываю Unscubscribe (Guid subscriptionToken) для экземпляра Foo.

1 Ответ

3 голосов
/ 11 мая 2011

Единственный способ сделать это, я думаю, с пользовательской реализацией ISomeCallbackInterface и Observable.Create. Тем не менее, он чувствует себя обутым в решение:

public static IObservable<Tuple<string,IEnumerable<Tuple<string, int, object>>> 
    FromCustomCallbackPattern(ISomeCallbackPublisher publisher)
{
    return Observable.CreateWithDisposable<T>(observer =>
    {
        var subject = new Subject<
            Tuple<string,IEnumerable<Tuple<string, int, object>>>();

        var callback = new ObservableSomeCallback(subject);

        Guid subscription = publisher.SubscribeToChanges(callback);

        return new CompositeDisposable(
            subject.Subscribe(observer),
            Disposable.Create(() => publisher.UnsubscribeFromChanges(subscription))
        );
    });
}

private class ObservableSomeCallback : ISomeCallbackInterface
{
    private IObserver<Tuple<string,IEnumerable<Tuple<string, int, object>>> observer;

    public ObservableSomeCallback(
        IObserver<Tuple<string,IEnumerable<Tuple<string, int, object>>> observer)
    {
        this.observer = observer;
    }

    public void CallbackMethod(string id, IEnumerable<Tuple<string, int, object>> data)
    {
        this.observer.OnNext(new Tuple<
            string,IEnumerable<Tuple<string, int, object>>(id, data));
    }
}
...