Проблема с архивированием более 2 наблюдаемых последовательностей - PullRequest
0 голосов
/ 04 октября 2018

Я ищу информацию, почему оператор Zip не работает с более чем 2 наблюдаемыми потоками:

var stream1 = Observable.Create<int>(o =>
        {
            o.OnNext(1);
            o.OnCompleted();
            return Disposable.Empty;
        });

        var stream2 = Observable.Create<int>(o =>
        {
            o.OnNext(1);
            o.OnCompleted();
            return Disposable.Empty;
        });

        var stream3 = Observable.Create<int>(o =>
        {
            o.OnNext(1);
            o.OnCompleted();
            return Disposable.Empty;
        });

        var stream4 = Observable.Create<int>(o =>
        {
            o.OnNext(1);
            o.OnCompleted();
            return Disposable.Empty;
        });

        var stream6 = stream1.Zip(stream2, stream3, stream4, (a, b, c, d) =>
        {
            return a + b + c + d;
        });

        var i = stream6.ToTask().GetAwaiter().GetResult();
        Console.WriteLine(i);
        Console.ReadKey();

Я получаю ошибку:

Последовательность содержитнет элементов

Оператор Zip отлично работает, когда я заархивирую только две последовательности.

Ниже я вставил трассировку стека:

   in System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   in System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   in System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
   in ConsoleApp1.Program.Main(String[] args) w Path\Program.cs:line 58

1 Ответ

0 голосов
/ 05 октября 2018

Это выглядит для меня как ошибка или, в лучшем случае, дрянной дизайн.Чтобы упростить задачу:

var stream = Observable.Return(1);

var result2 = await stream.Zip(stream, (a, b) => (a, b));
var result3 = await stream.Zip(stream, stream, (a, b, c) => (a, b, c));

Console.WriteLine($"result2 = {result2}");
Console.WriteLine($"result3 = {result3}");

result2 работает, потому что zip дает значение.Наблюдаемая result3 не дает значения, поэтому ожидание не выполняется.Тем не менее, он должен дать значение.Вот документация по этой перегрузке:

Объединяет указанные наблюдаемые последовательности в одну наблюдаемую последовательность с помощью функции селектора всякий раз, когда все наблюдаемые последовательности создали элемент с соответствующим индексом.

Поскольку все они дали значение с индексом 0, вы должны увидеть значение.Итак ... ошибка.

Интересно, если вы переопределите stream как таковое:

var stream = Observable.Return(1).Delay(TimeSpan.FromMilliseconds(15));

... тогда оба сработают.Ошибка, вероятно, связана с некоторым состоянием гонки.

Я думаю, что парная функция (с 2 наблюдаемыми) старше и проверена лучше, чем n-мудрые перегрузки.

...