Оператор слияния Rx и параллельное выполнение - PullRequest
0 голосов
/ 05 апреля 2019

Почему оператор .Merge ведет себя как .Synchronize?

У меня есть два потока событий, каждый из которых отправляет значения в разные потоки (например, используя Observable.Interval).

После использования оператора .Merge они блокируют друг друга в длинном операторе (только одна операция выполняется параллельно после объединения, как в операции .Synchronize()). Почему это?

var xs1 = Observable.Interval(TimeSpan.FromSeconds(1))
        .Select(x => $"xs1 : {x}").Log("xs1 generating");
var xs2 = Observable.Interval(TimeSpan.FromSeconds(1))
        .Select(x => $"xs1 : {x}").Log("xs2 generating");

xs1.Merge(xs2)
        .Do(x=> Thread.Sleep(2_000))
        .SubscribeConsoleWithThreads("after long work");

дает такие результаты:

13:44:14 thread 4 xs1 generating : "xs1 : 0"
13:44:14 thread 5 xs2 generating : "xs2 : 0"
13:44:16 thread 4 after long work: "xs1 : 0"
13:44:16 thread 4 xs1 generating : "xs1 : 1"
13:44:18 thread 5 after long work: "xs2 : 0"
13:44:18 thread 5 xs2 generating : "xs2 : 1"
13:44:20 thread 4 after long work: "xs1 : 1"
13:44:20 thread 4 xs1 generating : "xs1 : 2"

Как видите, значения "xs1: 0" и "xs2: 0" генерируются параллельно из обоих потоков, но после этого они генерируются один за другим.

Можно ли выполнить слияние с одним Observable без блокировки параллельного выполнения?

1 Ответ

0 голосов
/ 06 апреля 2019

Это контракт на Rx. Создайте ноль или более значений, по одному за раз , и опционально заканчивайте либо одним OnError, либо одним OnCompleted.

Оператор .Synchronize() предназначен для того, чтобы заставить несовместимое наблюдаемое быть совместимым.

Вызов xs1.Merge(xs2) не блокирует параллельное выполнение. Исполнение двух источников не зависит друг от друга. Rx просто гарантирует, что значения выводятся по одному, чтобы не было проблем с параллелизмом.

Проблема с вашим кодом - это вычисления после слияния. Если вы выполняете свою работу до того, как жизнь слияния станет лучше.

Сравните это:

var xs1 =
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Select(x => new { source = 1, value = x })
        .Timestamp()
        .Select(x => new { x.Value.source, x.Value.value, generated = x.Timestamp });

var xs2 =
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Select(x => new { source = 2, value = x })
        .Timestamp()
        .Select(x => new { x.Value.source, x.Value.value, generated = x.Timestamp });

xs1.Merge(xs2)
        .Do(x=> Thread.Sleep(2_000))
        .Timestamp()
        .Select(x => new { x.Value.source, x.Value.value, x.Value.generated, output = x.Timestamp })
        .Subscribe(x => Console.WriteLine($"{x.source}: {x.value} - {x.generated.ToString("ss.fff")} - {x.output.ToString("ss.fff")}"));

Это производит:

1: 0 - 18.875 - 20.877
2: 0 - 18.884 - 22.882
1: 1 - 20.881 - 24.883
2: 1 - 22.882 - 26.884
1: 2 - 24.883 - 28.884
2: 2 - 26.884 - 30.884
1: 3 - 28.884 - 32.886
2: 3 - 30.884 - 34.887
1: 4 - 32.886 - 36.887
2: 4 - 34.887 - 38.887

А потом вот это:

var xs1 =
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Select(x => new { source = 1, value = x })
        .Timestamp()
        .Select(x => new { x.Value.source, x.Value.value, generated = x.Timestamp })
        .Do(x=> Thread.Sleep(2_000));

var xs2 =
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Select(x => new { source = 2, value = x })
        .Timestamp()
        .Select(x => new { x.Value.source, x.Value.value, generated = x.Timestamp })
        .Do(x=> Thread.Sleep(2_000));

xs1.Merge(xs2)
        .Timestamp()
        .Select(x => new { x.Value.source, x.Value.value, x.Value.generated, output = x.Timestamp })
        .Subscribe(x => Console.WriteLine($"{x.source}: {x.value} - {x.generated.ToString("ss.fff")} - {x.output.ToString("ss.fff")}"));

Это дает:

1: 0 - 29.178 - 31.182
2: 0 - 29.180 - 31.201
1: 1 - 31.201 - 33.202
2: 1 - 31.201 - 33.202
2: 2 - 33.202 - 35.203
1: 2 - 33.202 - 35.203
1: 3 - 35.203 - 37.204
2: 3 - 35.203 - 37.204
1: 4 - 37.204 - 39.205
2: 4 - 37.204 - 39.205

Это удваивает выходную скорость наблюдаемой.

...