Zipping Rx IObservable с бесконечным набором чисел - PullRequest
0 голосов
/ 25 марта 2010

У меня есть IObservable [именованные строки в примере ниже] из инфраструктуры расширений Reactive, и я хочу добавить индексные номера для каждого объекта, который он наблюдает.

Я пытался реализовать это с помощью функции Zip:

rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) => 
    new { Row = row, Index = index })
    .Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());

.. но, к сожалению, это бросает

ArgumentOutOfRangeException: Указанный аргумент находится вне диапазона допустимых значений. Наименование параметра: расходные материалы

Я неправильно понимаю функцию Zip или есть проблема с моим кодом?

Часть кода Range, похоже, не является проблемой, а IObservable пока не получает никаких событий.

Ответы [ 3 ]

1 голос
/ 25 апреля 2010

.Select имеет перегрузку для включения индекса:

rows.Select((row, index) => new { row, index });
0 голосов
/ 28 марта 2010

Я не уверен, в чем ваша проблема, это работает для вас (и чего здесь не хватает, что вы делаете?):

    static void Main(string[] args)
    {
        var rows = new List<int> { 4,5,1,2,5 }.ToObservable();
        rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) =>
            new { Row = row, Index = index })
            .Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());

        Console.ReadLine();
    }
    static void ProcessRow(int row, int index) {
        Console.WriteLine("Row {0}, Index {1}", row, index);
    }
    static void Completed() {
    }
0 голосов
/ 25 марта 2010

Очевидно, что методы расширения Zip преобразуют исходную пользовательскую IObservable в анонимную наблюдаемую, и подписка на нее создает System.Collections.Generic.AnonymousObserver, который не реализует IDisposable. Таким образом, вы не можете реализовать метод Subscribe обычным способом (по крайней мере, так, как я его видел), то есть

public IDisposable Subscribe(IObserver<T> observer) {
  // ..add to observer list..
  return observer as IDisposable
}

Скорее всего, правильный ответ будет:

return Disposable.Create(() => Observers.Remove(observer));

Однако следует помнить, что коллизия, вероятно, будет изменена durin Completed-method, поэтому создайте копию списка перед их обработкой:

public void Completed()
{
    foreach (var observer in Observers.ToList())
    {
        observer.OnCompleted();
    }
 }
...