Наблюдение за синхронными и асинхронными результатами - PullRequest
3 голосов
/ 10 октября 2011

Используя Rx, я хочу наблюдать устаревший объект, который выставляет и метод GetItems, и событие NewItem.

Когда вызывается GetItems, он будет синхронно возвращать список любыхпредметы, которые есть в кеше.Он также создаст асинхронную выборку элементов, которые будут опубликованы через событие NewItem по мере их получения.

Как можно согласованно наблюдать оба этих источника (синхронизация + асинхронность), формулируязапрос LINQ такой, что оба набора результатов захвачены?Производственный заказ не имеет значения.

Ответы [ 2 ]

2 голосов
/ 10 октября 2011

Если я правильно понял ваш вопрос, это можно сделать, используя что-то вроде этого:

legacyObject.GetItems().ToObservable()
    .Merge(
        Observable.FromEventPattern<IntEventArgs>(legacyObject, "NewItem")
            .Select(e => e.EventArgs.Value));

Редактировать : Энигмативность обнаружила условие расы для решения выше (см. Комментарии ниже). Надеюсь, этот решит, что:

Observable.FromEventPattern<IntEventArgs>(legacyObject, "NewItem")
   .Select(e => e.EventArgs.Value)
   .Merge(Observable.Defer(() => legacyObject.GetItems().ToObservable()));

Вот остальная часть моего тестового кода, если это поможет. Я не знаю, точно ли я смоделировал ваш класс наследства:

class IntEventArgs : EventArgs
{
    public IntEventArgs(int value)
    {
        Value = value;
    }

    public int Value { get; private set; }
}

class Legacy
{
    public event EventHandler<IntEventArgs> NewItem;

    public IList<int> GetItems()
    {
        GenerateNewItemAsync();
        return new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
    }

    private void GenerateNewItemAsync()
    {
        Task.Factory.StartNew(() =>
        {
            for (var i = 0; i < 100000; ++i)
            {
                var handler = NewItem;
                if (handler != null) handler(this, new IntEventArgs(i));
                Thread.Sleep(500);
            }
        });
    }
}

class Example
{
    static void Main()
    {
        var legacyObject = new Legacy();

        legacyObject.GetItems().ToObservable()
            .Merge(
                Observable.FromEventPattern<IntEventArgs>(legacyObject, "NewItem")
                    .Select(e => e.EventArgs.Value))
            .Subscribe(Console.WriteLine);

        Console.ReadLine();
    }
}
2 голосов
/ 10 октября 2011

Дайте мне посмотреть, понял ли я ваш наследственный объект. Я предполагаю, что это универсальный тип, который выглядит следующим образом:

public class LegacyObject<T>
{
    public IEnumerable<T> GetItems();
    public event EventHandler<NewItemEventArgs<T>> NewItem;
}

С новым аргументом события элемента вот так:

public class NewItemEventArgs<T> : System.EventArgs
{
    public T NewItem { get; private set; }
    public NewItemEventArgs(T newItem)
    {
        this.NewItem = newItem;
    }
}

Теперь я создал .ToObservable() метод расширения для LegacyObject<T>:

public static IObservable<T> ToObservable<T>(
    this LegacyObject<T> @this)
{
    return Observable.Create<T>(o =>
    {
        var gate = new object();
        lock (gate)
        {
            var list = new List<T>();
            var subject = new Subject<T>();
            var newItems = Observable
                .FromEventPattern<NewItemEventArgs<T>>(
                    h => @this.NewItem += h,
                    h => @this.NewItem -= h)
                .Select(ep => ep.EventArgs.NewItem);
            var inner = newItems.Subscribe(ni =>
            {
                lock (gate)
                {
                    if (!list.Contains(ni))
                    {
                        list.Add(ni);
                        subject.OnNext(ni);
                    }
                }
            });
            list.AddRange(@this.GetItems());
            var outer = list.ToArray().ToObservable()
                .Concat(subject).Subscribe(o);
            return new CompositeDisposable(inner, outer);
        }
    });
}

Этот метод создает новую наблюдаемую информацию для каждого подписчика - что правильно делать при написании таких методов расширения.

Создает объект gate для блокировки доступа к внутреннему списку.

Поскольку вы сказали, что при вызове GetItems создается асинхронная функция для получения новых элементов, я убедился, что подписка NewItem создана до вызова GetItems.

Подписка inner проверяет, находится ли новый элемент в списке или нет, и вызывает OnNext субъекта, если его нет в списке.

Вызов GetItems сделан и значения добавлены во внутренний список через AddRange.

Маловероятно, но возможно, что элементы не будут добавлены в список до того, как событие NewItem начнет срабатывать в другом потоке. Вот почему существует блокировка доступа к списку. Подписка inner будет ждать, пока она не сможет получить блокировку, прежде чем пытаться добавить элементы в список, и это произойдет после добавления начальных элементов в список.

Наконец, внутренний список превращается в наблюдаемый, связанный с субъектом, и наблюдатель o подписывается на это наблюдаемое.

Две подписки возвращаются как одна IDisposable с использованием CompositeDisposable.

И это все для метода ToObservable.

Теперь я проверил это, создав конструктор на унаследованном объекте, который позволил бы мне передавать как перечисляемые, так и наблюдаемые значения. Перечислимое возвращается, когда вызывается GetItems, а наблюдаемое запускает событие NewItem.

Итак, мой тестовый код выглядел так:

var tester = new Subject<int>();
var legacy = new LegacyObject<int>(new [] { 1, 2, 3, }, tester);

var values = legacy.ToObservable();

values.Subscribe(v => Console.WriteLine(v));

tester.OnNext(3);
tester.OnNext(4);
tester.OnNext(4);
tester.OnNext(5);

И значения, записанные в консоль, были:

1
2
3
4
5

Дайте мне знать, если это соответствует вашим потребностям.

...