У меня есть 2 IConnectableObservable s, где одно воспроизводит старые исторические сообщения, а другое выдает свежие текущие значения:
HistoricObservable: - 1 - 2 - 3 - 4 - 5 - 6 - 7 - 8 - ...
CurrentObservable: - - - - - 5 - 6 - 7 - 8 - 9 - 10 - ...
Как я могу объединить их в одну наблюдаемую так, чтобы я получил полную (правильную) последовательность из обеих наблюдаемых, но также отбросил подписку и вызвал Dispose для подписки HistoricObservable, как только я начал испускать значения из CurrentObservable.
MergedObservable: - 1 - 2 - 3 - 4 - 56 - 7 - 8 - 9 - 10 - ...
Мои сообщения идентифицируются с помощью Guid , поэтому решение может сравнивать их только с использованием Equal и не может полагаться ни на какой порядок, кроме как он излучается каждой из наблюдаемых.
Короче, я собираюсь заполнить метод:
public static IObservable<T> MergeObservables<T>(
IObservable<T> historicObservable,
IObservable<T> currentObservable)
where T : IEquatable<T>
{
throw new NotImplementedException();
}
MergedObservable должен продолжать отправлять значения из HistoricObservable, не дожидаясь первого значения из CurrentObservable, и если первое значение из CurrentObservable уже было отправлено ранее, тогда MergedObservable должен пропустить любые значения в уже выданном CurrentObservable, утилизировать подписку на HistoricObservable и начните принимать все новые значения из CurrentObservable. Я также не хочу немедленно переключаться, когда первый объект испускается CurrentObservable, пока я не доберусь до этой точки в HistoricObservable, так что мне было трудно пытаться использовать TakeWhile / TakeUntil. У меня был небольшой успех с CombineLatest для сохранения состояния, но я думаю, что, возможно, есть лучший способ.
Контрольные примеры
Для следующих тестовых случаев предположим, что каждое сообщение представлено GUID следующим образом:
A = E021ED8F-F0B7-44A1-B099-9878C6400F34
B = 1139570D-8465-4D7D-982F-E83A183619DE
C = 0AA2422E-19D9-49A7-9E8C-C9333FC46C46
D = F77D0714-2A02-4154-A44C-E593FFC16E3F
E = 14570189-4AAD-4D60-8780-BCDC1D23273D
F = B42983F0-5161-4165-A2F7-074698ECCE77
G = D2506881-F8AB-447F-96FA-896AEAAD1D0A
H = 3063CB7F-CD25-4287-85C3-67C609FA5679
I = 91200C69-CC59-4488-9FBA-AD2D181FD276
J = 2BEA364E-BE86-48FF-941C-4894CEF7A257
K = 67375907-8587-4D77-9C58-3E3254666303
L = C37C2259-C81A-4BC6-BF02-C96A34011479
M = E6F709BE-8910-42AD-A100-2801697496B0
N = 8741D0BB-EDA9-4735-BBAF-CE95629E880D
1) Если историческая наблюдаемая никогда не догоняет текущую наблюдаемую, тогда объединенная наблюдаемая никогда не должна излучать что-либо из текущей наблюдаемой
Historic: - A - B - C - D - E - F - G - H|
Current: - - - - - - - - - - - - - - - I - J - K - L - M - N|
Merged: - A - B - C - D - E - F - G - H|
2) Как только историческая наблюдаемая достигает первого значения, испускаемого текущей наблюдаемой, тогда объединенная наблюдаемая должна немедленно испустить все значения, ранее испущенные текущей наблюдаемой, и отключиться от исторической наблюдаемой.
Historic: - A - B - C - D - E - F - G - H - I - J|
Current: - - - - - - E - F - G - H - I - J|
Merged: - A - B - C - D - EF-G- H - I - J|
3) Решение должно быть в состоянии обрабатывать значения, поступающие от текущей наблюдаемой до исторической наблюдаемой.
Historic: - - - - - A - B - C - D - E - F - G - H - I - J|
Current: - - C - D - E - F - G - H - I - J - K - L - M - N|
Merged: - - - - - A - B - CDEF-G-H- I - J - K - L - M - N|
4) Если значения из текущих наблюдаемых уже были переданы, то решение должно пропускать их, пока не будет получено новое значение.
Historic: - A - B - C - D - E - F - G - H - I - J|
Current: - - - - - - - - B - C - D - E - F - G - H - I - J|
Merged: - A - B - C - D - - - - - - E - F - G - H - I - J|
5) Для моих случаев использования я гарантирую, что текущая наблюдаемая будет подмножеством исторического, но для полноты картины я бы предположил, что решение будет продолжать извлекать из исторического наблюдаемого мышления, что первый элемент будет происходить при позже точка
Historic: - - - - - E - F - G - H - I - J - ... - Z - A|
Current: - - A - B - C - D - E - F - G - H - I - J|
Merged: - - - - - E - F - G - H - I - J - ... - Z - ABCDEFGHIJ|
6) Я также гарантирую, что историческая наблюдаемая не будет отличаться от текущей наблюдаемой, когда они синхронизированы, но если по какой-то причине они делают объединенную наблюдаемую, она должна уже отключиться от нее и не выберет до любых отличий
Historic: - A - B - C - D - E - D - C - B - A|
Current: - - - - - - E - F - G - H - I - J|
Merged: - A - B - C - D - EF-G- H - I - J|
Помощь по созданию рабочего решения, вот некоторые входные данные:
var historic = new Subject<int>();
var current = new Subject<int>();
// query & subscription goes here
historic.OnNext(1);
historic.OnNext(2);
current.OnNext(5);
historic.OnNext(3);
current.OnNext(6);
historic.OnNext(4);
current.OnNext(7);
historic.OnNext(5);
current.OnNext(8);
historic.OnNext(6);
current.OnNext(9);
historic.OnNext(7);
current.OnNext(10);
Правильное решение должно давать числа от 1 до 10.