Это контракт на Rx. Создайте ноль или более значений, по одному за раз , и опционально заканчивайте либо одним OnError
, либо одним OnCompleted
.
Оператор .Synchronize()
предназначен для того, чтобы заставить несовместимое наблюдаемое быть совместимым.
Вызов xs1.Merge(xs2)
не блокирует параллельное выполнение. Исполнение двух источников не зависит друг от друга. Rx просто гарантирует, что значения выводятся по одному, чтобы не было проблем с параллелизмом.
Проблема с вашим кодом - это вычисления после слияния. Если вы выполняете свою работу до того, как жизнь слияния станет лучше.
Сравните это:
var xs1 =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(x => new { source = 1, value = x })
.Timestamp()
.Select(x => new { x.Value.source, x.Value.value, generated = x.Timestamp });
var xs2 =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(x => new { source = 2, value = x })
.Timestamp()
.Select(x => new { x.Value.source, x.Value.value, generated = x.Timestamp });
xs1.Merge(xs2)
.Do(x=> Thread.Sleep(2_000))
.Timestamp()
.Select(x => new { x.Value.source, x.Value.value, x.Value.generated, output = x.Timestamp })
.Subscribe(x => Console.WriteLine($"{x.source}: {x.value} - {x.generated.ToString("ss.fff")} - {x.output.ToString("ss.fff")}"));
Это производит:
1: 0 - 18.875 - 20.877
2: 0 - 18.884 - 22.882
1: 1 - 20.881 - 24.883
2: 1 - 22.882 - 26.884
1: 2 - 24.883 - 28.884
2: 2 - 26.884 - 30.884
1: 3 - 28.884 - 32.886
2: 3 - 30.884 - 34.887
1: 4 - 32.886 - 36.887
2: 4 - 34.887 - 38.887
А потом вот это:
var xs1 =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(x => new { source = 1, value = x })
.Timestamp()
.Select(x => new { x.Value.source, x.Value.value, generated = x.Timestamp })
.Do(x=> Thread.Sleep(2_000));
var xs2 =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(x => new { source = 2, value = x })
.Timestamp()
.Select(x => new { x.Value.source, x.Value.value, generated = x.Timestamp })
.Do(x=> Thread.Sleep(2_000));
xs1.Merge(xs2)
.Timestamp()
.Select(x => new { x.Value.source, x.Value.value, x.Value.generated, output = x.Timestamp })
.Subscribe(x => Console.WriteLine($"{x.source}: {x.value} - {x.generated.ToString("ss.fff")} - {x.output.ToString("ss.fff")}"));
Это дает:
1: 0 - 29.178 - 31.182
2: 0 - 29.180 - 31.201
1: 1 - 31.201 - 33.202
2: 1 - 31.201 - 33.202
2: 2 - 33.202 - 35.203
1: 2 - 33.202 - 35.203
1: 3 - 35.203 - 37.204
2: 3 - 35.203 - 37.204
1: 4 - 37.204 - 39.205
2: 4 - 37.204 - 39.205
Это удваивает выходную скорость наблюдаемой.