Как я могу объединить два потока, упорядоченных и сгруппированных по меткам времени? - PullRequest
6 голосов
/ 16 марта 2012

У меня есть два потока объектов, каждый из которых имеет значение Timestamp. Оба потока расположены по порядку, поэтому, например, временные метки могут быть T a = 1,3,6,6,7 в одном потоке и T b = 1,2,5,5,6,8 в другом. Объекты в обоих потоках относятся к одному типу.

То, что я хотел бы сделать, это поместить каждое из этих событий в шину в порядке отметки времени, то есть положить A 1 , затем B 1 , B 2 , A 3 и так далее. Кроме того, поскольку некоторые потоки имеют несколько (последовательных) элементов с одинаковой временной меткой, я хочу, чтобы эти элементы были сгруппированы так, чтобы каждое новое событие представляло собой массив. Таким образом, мы добавили бы [A 3 ] в шину, затем [A 1 5 , A 2 5 ] и т. Д.

Я пытался реализовать это, создав две ConcurrentQueue структуры, поместив каждое событие в конец очереди, затем просматривая каждый фронт очереди, выбирая сначала более раннее событие, а затем обходя очередь так, чтобы все события с этой отметкой времени присутствуют.

Однако я столкнулся с двумя проблемами:

  • Если я оставлю эти очереди неограниченными, мне быстро не хватит памяти, поскольку операция чтения намного быстрее, чем обработчики, получающие события. (У меня есть несколько гигабайт данных).
  • Иногда я сталкиваюсь с ситуацией, когда я обрабатываю событие, скажем, A 1 5 до того, как A 2 5 прибыли. Мне как-то нужно остерегаться этого.

Я думаю, что Rx может помочь в этом отношении, но я не вижу очевидного комбинатора (ов), чтобы сделать это возможным. Таким образом, любой совет очень ценится.

1 Ответ

10 голосов
/ 17 марта 2012

Rx действительно хорошо подходит для этой проблемы IMO.

IObservables не может 'OrderBy' по очевидным причинам (сначала вам нужно было бы наблюдать весь поток, чтобы гарантировать правильный порядок вывода),поэтому мой ответ ниже предполагает (что вы заявили), что ваши 2 исходных потока событий находятся в порядке.

В конце концов, это была интересная проблема.В стандартных операторах Rx отсутствует GroupByUntilChanged, который мог бы легко это решить, поскольку он вызывал OnComplete в предыдущей группе, наблюдаемой при наблюдении первого элемента следующей группы.Однако, глядя на реализацию DistinctUntilChanged, она не следует этому шаблону и вызывает OnComplete только тогда, когда завершается исходная наблюдаемая (даже несмотря на то, что она знает, что не будет больше элементов после первого не различимого элемента ... странно???).В любом случае, по этим причинам я решил отказаться от метода GroupByUntilChanged (чтобы не нарушать соглашения Rx) и вместо этого пошел на ToEnumerableUntilChanged.

Отказ от ответственности: Это мое первое расширение Rx, поэтому буду признателен за отзыв о моемвыбор сделан.Кроме того, одной из моих главных проблем является анонимная наблюдаемая, содержащая список distinctElements.

Во-первых, код вашего приложения довольно прост:

    public class Event
    {
        public DateTime Timestamp { get; set; }
    }

    private IObservable<Event> eventStream1;
    private IObservable<Event> eventStream2; 

    public IObservable<IEnumerable<Event>> CombineAndGroup()
    {
        return eventStream1.CombineLatest(eventStream2, (e1, e2) => e1.Timestamp < e2.Timestamp ? e1 : e2)
            .ToEnumerableUntilChanged(e => e.Timestamp);
    }

Теперь для реализации ToEnumerableUntilChanged (Стена кода предупреждения):

    public static IObservable<IEnumerable<TSource>> ToEnumerableUntilChanged<TSource,TKey>(this IObservable<TSource> source, Func<TSource,TKey> keySelector)
    {
        // TODO: Follow Rx conventions and create a superset overload that takes the IComparer as a parameter
        var comparer = EqualityComparer<TKey>.Default;

        return Observable.Create<IEnumerable<TSource>>(observer =>
        {
            var currentKey = default(TKey);
            var hasCurrentKey = false;
            var distinctElements = new List<TSource>();

            return source.Subscribe((value =>
            {
                TKey elementKey;
                try
                {
                    elementKey = keySelector(value);
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                    return;
                }

                if (!hasCurrentKey)
                {
                    hasCurrentKey = true;
                    currentKey = elementKey;
                    distinctElements.Add(value);
                    return;
                }

                bool keysMatch;
                try
                {
                    keysMatch = comparer.Equals(currentKey, elementKey);
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                    return;
                }

                if (keysMatch)
                {
                    distinctElements.Add(value);
                    return;
                }

                observer.OnNext( distinctElements);

                distinctElements.Clear();
                distinctElements.Add(value);
                currentKey = elementKey;

            }), observer.OnError, () =>
            {
                if (distinctElements.Count > 0)
                    observer.OnNext(distinctElements);

                observer.OnCompleted();
            });
        });
    }
...