Rx.NET: объединить наблюдаемые в порядке - PullRequest
0 голосов
/ 11 мая 2018

У меня есть 2 IConnectableObservable s, где одно воспроизводит старые исторические сообщения, а другое выдает свежие текущие значения:

HistoricObservable: - 1 - 2 - 3 - 4 - 5 - 6 - 7 - 8 - ...
CurrentObservable:    - - - - - 5 - 6 - 7 - 8 - 9 - 10 - ...

Как я могу объединить их в одну наблюдаемую так, чтобы я получил полную (правильную) последовательность из обеих наблюдаемых, но также отбросил подписку и вызвал Dispose для подписки HistoricObservable, как только я начал испускать значения из CurrentObservable.

MergedObservable: - 1 - 2 - 3 - 4 - 56 - 7 - 8 - 9 - 10 - ...

Мои сообщения идентифицируются с помощью Guid , поэтому решение может сравнивать их только с использованием Equal и не может полагаться ни на какой порядок, кроме как он излучается каждой из наблюдаемых.

Короче, я собираюсь заполнить метод:

public static IObservable<T> MergeObservables<T>(
    IObservable<T> historicObservable,
    IObservable<T> currentObservable)
    where T : IEquatable<T>
{
    throw new NotImplementedException();
}

MergedObservable должен продолжать отправлять значения из HistoricObservable, не дожидаясь первого значения из CurrentObservable, и если первое значение из CurrentObservable уже было отправлено ранее, тогда MergedObservable должен пропустить любые значения в уже выданном CurrentObservable, утилизировать подписку на HistoricObservable и начните принимать все новые значения из CurrentObservable. Я также не хочу немедленно переключаться, когда первый объект испускается CurrentObservable, пока я не доберусь до этой точки в HistoricObservable, так что мне было трудно пытаться использовать TakeWhile / TakeUntil. У меня был небольшой успех с CombineLatest для сохранения состояния, но я думаю, что, возможно, есть лучший способ.

Контрольные примеры

Для следующих тестовых случаев предположим, что каждое сообщение представлено GUID следующим образом:

A = E021ED8F-F0B7-44A1-B099-9878C6400F34
B = 1139570D-8465-4D7D-982F-E83A183619DE
C = 0AA2422E-19D9-49A7-9E8C-C9333FC46C46
D = F77D0714-2A02-4154-A44C-E593FFC16E3F
E = 14570189-4AAD-4D60-8780-BCDC1D23273D
F = B42983F0-5161-4165-A2F7-074698ECCE77
G = D2506881-F8AB-447F-96FA-896AEAAD1D0A
H = 3063CB7F-CD25-4287-85C3-67C609FA5679
I = 91200C69-CC59-4488-9FBA-AD2D181FD276
J = 2BEA364E-BE86-48FF-941C-4894CEF7A257
K = 67375907-8587-4D77-9C58-3E3254666303
L = C37C2259-C81A-4BC6-BF02-C96A34011479
M = E6F709BE-8910-42AD-A100-2801697496B0
N = 8741D0BB-EDA9-4735-BBAF-CE95629E880D

1) Если историческая наблюдаемая никогда не догоняет текущую наблюдаемую, тогда объединенная наблюдаемая никогда не должна излучать что-либо из текущей наблюдаемой

Historic: - A - B - C - D - E - F - G - H|
Current:    - - - - - - - - - - - - - - - I - J - K - L - M - N|
Merged:   - A - B - C - D - E - F - G - H|

2) Как только историческая наблюдаемая достигает первого значения, испускаемого текущей наблюдаемой, тогда объединенная наблюдаемая должна немедленно испустить все значения, ранее испущенные текущей наблюдаемой, и отключиться от исторической наблюдаемой.

Historic: - A - B - C - D - E - F - G - H - I - J|
Current:  - - - - - - E - F - G - H - I - J|
Merged:   - A - B - C - D - EF-G- H - I - J|

3) Решение должно быть в состоянии обрабатывать значения, поступающие от текущей наблюдаемой до исторической наблюдаемой.

Historic: - - - - - A - B - C - D - E - F - G - H - I - J|
Current:  - - C - D - E - F - G - H - I - J - K - L - M - N|
Merged:   - - - - - A - B - CDEF-G-H- I - J - K - L - M - N|

4) Если значения из текущих наблюдаемых уже были переданы, то решение должно пропускать их, пока не будет получено новое значение.

Historic: - A - B - C - D - E - F - G - H - I - J|
Current:  - - - - - - - - B - C - D - E - F - G - H - I - J|
Merged:   - A - B - C - D - - - - - - E - F - G - H - I - J|

5) Для моих случаев использования я гарантирую, что текущая наблюдаемая будет подмножеством исторического, но для полноты картины я бы предположил, что решение будет продолжать извлекать из исторического наблюдаемого мышления, что первый элемент будет происходить при позже точка

Historic: - - - - - E - F - G - H - I - J - ... - Z - A|
Current:  - - A - B - C - D - E - F - G - H - I - J|
Merged:   - - - - - E - F - G - H - I - J - ... - Z - ABCDEFGHIJ|

6) Я также гарантирую, что историческая наблюдаемая не будет отличаться от текущей наблюдаемой, когда они синхронизированы, но если по какой-то причине они делают объединенную наблюдаемую, она должна уже отключиться от нее и не выберет до любых отличий

Historic: - A - B - C - D - E - D - C - B - A|
Current:  - - - - - - E - F - G - H - I - J|
Merged:   - A - B - C - D - EF-G- H - I - J|

Помощь по созданию рабочего решения, вот некоторые входные данные:

var historic = new Subject<int>();
var current = new Subject<int>();

// query & subscription goes here

historic.OnNext(1);
historic.OnNext(2);
current.OnNext(5);
historic.OnNext(3);
current.OnNext(6);
historic.OnNext(4);
current.OnNext(7);
historic.OnNext(5);
current.OnNext(8);
historic.OnNext(6);
current.OnNext(9);
historic.OnNext(7);
current.OnNext(10);

Правильное решение должно давать числа от 1 до 10.

Ответы [ 4 ]

0 голосов
/ 15 мая 2018

Предполагая, что вам нужны отличные результаты независимо от порядка появления, возможно, этот подход тоже работает:

       var replayCurrent = current.Replay();
        replayCurrent.Connect();


        var merged = historic
            .Scan(
                new { history = new List<string>(), firstVal = (string)null },
                (state, val) =>
                { state.history.Add(val); return state; }
                )
            .Merge(
                current.Take(1).Select(v => new { history = (List<string>)null, firstVal = v })

                )
            .Scan(new { history = (List<string>)null, firstVal = (string)null },
                (state, val) =>
                new { history = val.history ?? state.history, firstVal = val.firstVal ?? state.firstVal })
            .TakeWhile(v => 
                (null==v.firstVal || ( null!=v.firstVal && !v.history.Contains(v.firstVal)))
                )
            .Select(v=>v.history.Last())
            .Concat(replayCurrent)
            .Distinct();

        merged.Subscribe(x => Console.WriteLine(x));
0 голосов
/ 13 мая 2018

Попробуйте:

var historic = new Subject<int>();
var current = new Subject<int>();

var subscription =
    Observable
        .Defer(() =>
        {
            var c = int.MaxValue;
            return
                current
                    .Do(x => { if (c == int.MaxValue) c = x; })
                    .Publish(pc =>
                        historic
                            .TakeWhile(h => h < c)
                            .Publish(ph =>
                                Observable
                                    .Merge(
                                        ph.Delay(x => pc.FirstOrDefaultAsync(y => x < y)),
                                        pc.Delay(y => ph.FirstOrDefaultAsync(x => y < x)))))
                    .Distinct();
        })
        .Subscribe(x => Console.WriteLine(x));

historic.OnNext(1);
historic.OnNext(2);
current.OnNext(5);
historic.OnNext(3);
current.OnNext(6);
historic.OnNext(4);
current.OnNext(7);
historic.OnNext(5);
current.OnNext(8);
historic.OnNext(6);
current.OnNext(9);
historic.OnNext(7);
current.OnNext(10);

Это дает от 1 до 10 в порядке.Может потребоваться небольшая настройка, чтобы получить правильные исторические и текущие значения.


Чтобы подтвердить, что правильные значения выводятся, попробуйте следующее:

var subscription =
    Observable
        .Defer(() =>
        {
            var c = int.MaxValue;
            return
                current
                    .Do(x => { if (c == int.MaxValue) c = x; })
                    .Select(x => new { source = "current", value = x })
                    .Publish(pc =>
                        historic
                            .TakeWhile(h => h < c)
                            .Finally(() => Console.WriteLine("!"))
                            .Select(x => new { source = "historic", value = x })
                            .Publish(ph =>
                                Observable
                                    .Merge(
                                        ph.Delay(x => pc.FirstOrDefaultAsync(y => x.value < y.value)),
                                        pc.Delay(y => ph.FirstOrDefaultAsync(x => y.value < x.value)))))
                    .Distinct(x => x.value);
        })
        .Subscribe(x => Console.WriteLine($"{x.source}:{x.value}"));

Выходэто:

historic:1
historic:2
historic:3
historic:4
current:5
current:6
current:7
!
current:8
current:9
current:10

! показывает, где располагается историческая наблюдаемая.

0 голосов
/ 14 мая 2018

Возвращаясь к этому, я, наконец, добился некоторого прогресса в том направлении, в котором я шел. Получил возможность пройти все тестовые примеры в описании, поэтому я думаю, что этого будет достаточно для моих вариантов использования. Конструктивная обратная связь всегда ценится.

public static IObservable<T> CombineObservables<T>(
    IObservable<T> historicObservable,
    IObservable<T> currentObservable)
    where T : IEquatable<T>
{
    var cachedCurrent = currentObservable.Replay();
    cachedCurrent.Connect();

    var firstMessage = cachedCurrent.FirstAsync();

    var emittedHistoryItems = new List<T>();

    var part1 = historicObservable.TakeUntil(firstMessage)
                                  .Do(x => emittedHistoryItems.Add(x));

    var part2 = historicObservable.CombineLatest(firstMessage, Tuple.Create)
                                  .TakeWhile(x =>
                                             {
                                                 var historyItem = x.Item1;
                                                 var first = x.Item2;

                                                 return !emittedHistoryItems.Any(y => y.Equals(first)) && !historyItem.Equals(first);
                                             })
                                  .Select(x => x.Item1)
                                  .Do(x => emittedHistoryItems.Add(x));

    var part3 = cachedCurrent.SkipWhile(x => emittedHistoryItems.Contains(x));

    return part1.Concat(part2).Concat(part3);
}

Пример скрипки: https://dotnetfiddle.net/6BqfiW

0 голосов
/ 12 мая 2018

Если я правильно понимаю ваш вопрос, вы могли бы

  • Создание наблюдаемой, которая испускает две наблюдаемые
  • Первый является «историческим» и может быть введен с помощью StartsWith
  • Второй является «текущим» и предоставляется только на первый элемент текущего, что означает, что вам нужно подписаться на это наблюдаемое с помощью Take (1) или чего-то еще, чтобы получить первый элемент, и. Выберите наблюдаемый сам. (При необходимости можно добавить .Where и .Scan для сравнения значений в 2 последовательностях)
  • Объедините два с .Switch. Коммутатор откажется от исторического и с этого момента будет следовать за током.

UPDATE:

Я не думаю, что данные примера отражают проблему, по крайней мере, не так, как я ее понимаю. Попробуйте это

var historic = new Subject<int>();
var current = new Subject<int>();
var observables = new Subject<IObservable<int>>();
int[] tracker = new int[] { int.MaxValue, int.MaxValue };

var merged = historic
                .Select(x => new { source = (int)0, val = x })
                .Merge(current.Select(y => new { source = (int)1, val = y }))
                .Scan((state, v) => {
                        tracker[v.source] = v.val;
                        return v; })
                .Do(x => {
                    if ((tracker[1] <= tracker[0]) && tracker[0] != int.MaxValue)
                        observables.OnNext(current.StartWith(x.val));
                    })
                .Where(x => x.source == 0)
                .Select(x => x.val);

var streamSelector = observables.Switch();

streamSelector.Subscribe(x => Console.WriteLine(x));
observables.OnNext(merged);

historic.OnNext(1);
historic.OnNext(2);
current.OnNext(5);
historic.OnNext(3);
current.OnNext(6);
historic.OnNext(4);
current.OnNext(7);
historic.OnNext(5);
current.OnNext(8);
historic.OnNext(6);
current.OnNext(9);
historic.OnNext(7);
current.OnNext(10);
...