Как объединить несколько IO-наблюдаемых последовательностей? - PullRequest
8 голосов
/ 06 февраля 2011
        var a = Observable.Range(0, 10);
        var b = Observable.Range(5, 10);
        var zip = a.Zip(b, (x, y) => x + "-" + y);
        zip.Subscribe(Console.WriteLine);

Печать
0 - 5
1 - 6
2 - 7
...

Вместо этого я хотел бы объединить идентичные значения
5 - 5
6 - 6
7 - 7
8 - 8
...

Это упрощенный пример проблемы слияния сотен упорядоченных асинхронных последовательностей. Присоединиться к двум IEnumerable очень легко, но я не смог найти способ сделать что-то подобное в Rx. Есть идеи?

Подробнее о входах и о том, чего я пытаюсь достичь. По сути, вся система представляет собой конвейер реального времени с несколькими конечными автоматами (агрегаторами, буферами, фильтрами сглаживания и т. Д.), Соединенными с помощью шаблона fork-join. RX хорошо подходит для реализации таких вещей? Каждый вход может быть представлен как

public struct DataPoint
{
    public double Value;
    public DateTimeOffset Timestamp;
}

Каждый входной бит данных помечается по прибытии, поэтому все события естественным образом упорядочены по их ключу присоединения (отметка времени). По мере того, как события проходят через конвейер, они разветвляются и присоединяются. Соединения должны коррелироваться с отметкой времени и применяться в предварительно определенном порядке. Например, join (a, b, c, d) => join (объединение (join (a, b), c), d).

Редактировать Вот то, что я мог придумать в спешке. Надеемся, что существует более простое решение на основе существующих операторов Rx.

static void Test()
    {
        var a = Observable.Range(0, 10);
        var b = Observable.Range(5, 10);
        //var zip = a.Zip(b, (x, y) => x + "-" + y);
        //zip.Subscribe(Console.WriteLine);

        var joined = MergeJoin(a,b, (x,y) => x + "-" + y);
        joined.Subscribe(Console.WriteLine);
    }

static IObservable<string> MergeJoin(IObservable<int> left, IObservable<int> right, Func<int, int, string> selector)
    {
        return Observable.CreateWithDisposable<string>(o =>
            {
                Queue<int> a = new Queue<int>();
                Queue<int> b = new Queue<int>();
                object gate = new object();

                left.Subscribe(x =>
                    {
                        lock (gate)
                        {
                            if (a.Count == 0 || a.Peek() < x)
                                a.Enqueue(x);

                            while (a.Count != 0 && b.Count != 0)
                            {
                                if (a.Peek() == b.Peek())
                                {
                                    o.OnNext(selector(a.Dequeue(), b.Dequeue()));
                                }
                                else if (a.Peek() < b.Peek())
                                {
                                    a.Dequeue();
                                }
                                else
                                {
                                    b.Dequeue();
                                }
                            }
                        }
                    });

                right.Subscribe(x =>
                {
                    lock (gate)
                    {
                        if (b.Count == 0 || b.Peek() < x)
                            b.Enqueue(x);

                        while (a.Count != 0 && b.Count != 0)
                        {
                            if (a.Peek() == b.Peek())
                            {
                                o.OnNext(selector(a.Dequeue(), b.Dequeue()));
                            }
                            else if (a.Peek() < b.Peek())
                            {
                                a.Dequeue();
                            }
                            else
                            {
                                b.Dequeue();
                            }
                        }
                    }
                });

                return Disposable.Empty;
            });

Ответы [ 4 ]

3 голосов
/ 06 февраля 2011

GroupBy может делать то, что вам нужно.Похоже, что у вас нет временных ограничений на то, когда элементы «объединяются», вам просто нужно, чтобы похожие элементы были вместе каким-то образом.

Observable.Merge(Observable.Range(1, 10), Observable.Range(5, 15))
.GroupBy(k => k)
.Subscribe( go => go.Count().Where(cnt => cnt > 1)
                            .Subscribe(cnt => 
                     Console.WriteLine("Key {0} has {1} matches", go.Key, cnt)));

Две вещи, о которых следует упомянуть, Merge имеет следующие перегрузки, поэтому ваш запрос о наличии сотен объединенных потоков не будет представлять проблему:

Merge<TSource>(params IObservable<TSource>[] sources);
Merge<TSource>(this IEnumerable<IObservable<TSource>> sources);
Merge<TSource>(this IObservable<IObservable<TSource>> source);

Более того, GroupBy возвращает IObservable<IGroupedObservable<TKey, TSource>>, что означает, что вы можете реагировать на каждую группу и каждую новуючлен каждой группы по мере их поступления - не нужно ждать, пока все завершится.

2 голосов
/ 08 февраля 2011

Честно говоря, я не могу придумать решение, основанное на существующих операторах, которое работает для горячих источников неизвестного порядка (то есть xs before ys против ys before xs). Ваше решение кажется хорошим (эй, если оно работает), но я бы внес несколько изменений, если бы это был мой код:

  • Поддержка отмены правильно с использованием MutableDisposable и CompositeDisposable
  • Вызовите OnError для исключений, генерируемых селектором (что делает его более согласованным с другими операторами)
  • Подумайте о поддержке завершения, если один источник может завершить работу раньше, чем другой

Приведенный ниже код был протестирован с вашим двухдиапазонным входом, с такими же перевернутыми входами, как и с Empty<int> + Never<int>:

public static IObservable<string> MergeJoin(
    IObservable<int> left, IObservable<int> right, Func<int, int, string> selector)
{
    return Observable.CreateWithDisposable<string>(o =>
    {
        Queue<int> a = new Queue<int>();
        Queue<int> b = new Queue<int>();
        object gate = new object();

        bool leftComplete = false;
        bool rightComplete = false;

        MutableDisposable leftSubscription = new MutableDisposable();
        MutableDisposable rightSubscription = new MutableDisposable();

        Action tryDequeue = () =>
        {
            lock (gate)
            {
                while (a.Count != 0 && b.Count != 0)
                {
                    if (a.Peek() == b.Peek())
                    {
                        string value = null;

                        try
                        {
                            value = selector(a.Dequeue(), b.Dequeue());
                        }
                        catch (Exception ex)
                        {
                            o.OnError(ex);
                            return;
                        }

                        o.OnNext(value);
                    }
                    else if (a.Peek() < b.Peek())
                    {
                        a.Dequeue();
                    }
                    else
                    {
                        b.Dequeue();
                    }
                }
            }
        };

        leftSubscription.Disposable = left.Subscribe(x =>
        {
            lock (gate)
            {
                if (a.Count == 0 || a.Peek() < x)
                    a.Enqueue(x);

                tryDequeue();

                if (rightComplete && b.Count == 0)
                {
                    o.OnCompleted();
                }
            }
        }, () =>
        {
            leftComplete = true;

            if (a.Count == 0 || rightComplete)
            {
                o.OnCompleted();
            }
        });

        rightSubscription.Disposable = right.Subscribe(x =>
        {
            lock (gate)
            {
                if (b.Count == 0 || b.Peek() < x)
                    b.Enqueue(x);

                tryDequeue();

                if (rightComplete && b.Count == 0)
                {
                    o.OnCompleted();
                }
            }
        }, () =>
        {
            rightComplete = true;

            if (b.Count == 0 || leftComplete)
            {
                o.OnCompleted();
            }
        });

        return new CompositeDisposable(leftSubscription, rightSubscription);
    });
}
2 голосов
/ 07 февраля 2011

Этот ответ скопирован с Rx форумов , просто для того, чтобы он также был заархивирован здесь:

var xs = Observable.Range(1, 10);
var ys = Observable.Range(5, 10);

var joined = from x in xs
    from y in ys
    where x == y
    select x + "-" + y;

Или без использования выражений запроса:

var joined = 
    xs.SelectMany(x => ys, (x, y) => new {x, y})
    .Where(t => t.x == t.y)
    .Select(t => t.x + "-" + t.y);
1 голос
/ 06 февраля 2011

Как насчет использования нового оператора Join в v.2838.

var a = Observable.Range(1, 10);
var b = Observable.Range(5, 10);

var joinedStream = a.Join(b, _ => Observable.Never<Unit>(), _ => Observable.Never<Unit>(), 
    (aOutput, bOutput) => new Tuple<int, int>(aOutput,  bOutput))
    .Where(tupple => tupple.Item1 == tupple.Item2);

joinedStream.Subscribe(output => Trace.WriteLine(output));

Это мой первый взгляд на Join, и я не уверен, что было бы разумно использовать оператор Never таким образом.При работе с большими объемами входных данных, поскольку это приведет к огромному количеству операций, было получено больше входных данных.Я думаю, что можно было бы сделать работу, чтобы закрыть окна по мере согласования и сделать решение более эффективным.Тем не менее, приведенный выше пример работает в соответствии с вашим вопросом.

Для протокола, я думаю, что ответ Скотта, вероятно, является подходом в данном случае.Я просто добавляю это как потенциальную альтернативу.

...