var a = Observable.Range(0, 10);
var b = Observable.Range(5, 10);
var zip = a.Zip(b, (x, y) => x + "-" + y);
zip.Subscribe(Console.WriteLine);
Печать
0 - 5
1 - 6
2 - 7
...
Вместо этого я хотел бы объединить идентичные значения
5 - 5
6 - 6
7 - 7
8 - 8
...
Это упрощенный пример проблемы слияния сотен упорядоченных асинхронных последовательностей. Присоединиться к двум IEnumerable очень легко, но я не смог найти способ сделать что-то подобное в Rx. Есть идеи?
Подробнее о входах и о том, чего я пытаюсь достичь. По сути, вся система представляет собой конвейер реального времени с несколькими конечными автоматами (агрегаторами, буферами, фильтрами сглаживания и т. Д.), Соединенными с помощью шаблона fork-join. RX хорошо подходит для реализации таких вещей?
Каждый вход может быть представлен как
public struct DataPoint
{
public double Value;
public DateTimeOffset Timestamp;
}
Каждый входной бит данных помечается по прибытии, поэтому все события естественным образом упорядочены по их ключу присоединения (отметка времени). По мере того, как события проходят через конвейер, они разветвляются и присоединяются. Соединения должны коррелироваться с отметкой времени и применяться в предварительно определенном порядке. Например, join (a, b, c, d) => join (объединение (join (a, b), c), d).
Редактировать
Вот то, что я мог придумать в спешке. Надеемся, что существует более простое решение на основе существующих операторов Rx.
static void Test()
{
var a = Observable.Range(0, 10);
var b = Observable.Range(5, 10);
//var zip = a.Zip(b, (x, y) => x + "-" + y);
//zip.Subscribe(Console.WriteLine);
var joined = MergeJoin(a,b, (x,y) => x + "-" + y);
joined.Subscribe(Console.WriteLine);
}
static IObservable<string> MergeJoin(IObservable<int> left, IObservable<int> right, Func<int, int, string> selector)
{
return Observable.CreateWithDisposable<string>(o =>
{
Queue<int> a = new Queue<int>();
Queue<int> b = new Queue<int>();
object gate = new object();
left.Subscribe(x =>
{
lock (gate)
{
if (a.Count == 0 || a.Peek() < x)
a.Enqueue(x);
while (a.Count != 0 && b.Count != 0)
{
if (a.Peek() == b.Peek())
{
o.OnNext(selector(a.Dequeue(), b.Dequeue()));
}
else if (a.Peek() < b.Peek())
{
a.Dequeue();
}
else
{
b.Dequeue();
}
}
}
});
right.Subscribe(x =>
{
lock (gate)
{
if (b.Count == 0 || b.Peek() < x)
b.Enqueue(x);
while (a.Count != 0 && b.Count != 0)
{
if (a.Peek() == b.Peek())
{
o.OnNext(selector(a.Dequeue(), b.Dequeue()));
}
else if (a.Peek() < b.Peek())
{
a.Dequeue();
}
else
{
b.Dequeue();
}
}
}
});
return Disposable.Empty;
});