Вот тестируемое представление вашей проблемы (или мраморная диаграмма), используя пакет NuGet Microsoft.Reactive.Testing
:
var scheduler = new TestScheduler();
var input1 = scheduler.CreateColdObservable<int>(
ReactiveTest.OnNext(1000.Ms(), 1),
ReactiveTest.OnNext(2000.Ms(), 2),
ReactiveTest.OnNext(3000.Ms(), 3),
ReactiveTest.OnNext(4000.Ms(), 4),
ReactiveTest.OnNext(5000.Ms(), 5),
ReactiveTest.OnNext(6000.Ms(), 6),
ReactiveTest.OnNext(7000.Ms(), 7)
);
var input2 = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(1400.Ms(), "a"),
ReactiveTest.OnNext(1500.Ms(), "b"),
ReactiveTest.OnNext(1600.Ms(), "c"),
ReactiveTest.OnNext(5500.Ms(), "d"),
ReactiveTest.OnNext(5600.Ms(), "e"),
ReactiveTest.OnNext(5700.Ms(), "f")
);
, который использует этот метод расширения:
public static class TickExtensions
{
public static long Ms(this int ms)
{
return TimeSpan.FromMilliseconds(ms).Ticks;
}
}
Проблема заключается вв основном проблема конечного автомата, которая включает в себя две наблюдаемые разных типов.Лучший способ решить эту проблему - использовать тип Discrimination Union , которого нет в C #, поэтому мы его создадим.Ответ @ Sentinel сделал это с помощью Tuple, и это также может сработать:
public class DUnion<T1, T2>
{
public DUnion(T1 t1)
{
Type1Item = t1;
Type2Item = default(T2);
IsType1 = true;
}
public DUnion(T2 t2)
{
Type2Item = t2;
Type1Item = default(T1);
IsType1 = false;
}
public bool IsType1 { get; }
public bool IsType2 => !IsType1;
public T1 Type1Item { get; }
public T2 Type2Item { get; }
}
Затем мы можем взять два наших потока разной типизации, Select
и Merge
, в один распознаваемый поток объединения,где мы можем управлять государством с Scan
.Ваша логика состояния немного сложна, но выполнима:
- , если приходит число и в очереди нет элементов, ничего не делать
- , если число приходит и в очереди есть элементы, выпустить первый элемент в очереди.
- Если имеется более одного элемента, удалите недавнее удаление из очереди.
- Если в очереди есть только один элемент, не удаляйте его и переходите в «поддельное пустое» состояние.
- если приходит строка, вставьте ее вочередь.
- Если очередь является «поддельной пустой», извлеките последний элемент и выйдите из состояния «поддельной пустой».
Вот результат, наблюдаемый (используетПакет NuGet System.Collections.Immutable
):
var result = input1.Select(i => new DUnion<int, string>(i))
.Merge(input2.Select(s => new DUnion<int, string>(s)))
.Scan((queue: ImmutableQueue<string>.Empty, item: (string)null, isFakeEmptyState: false, emit: false), (state, dItem) => dItem.IsType1
? state.queue.IsEmpty
? (state.queue, null, false, false) //Is integer, but empty queue, so don't emit item
: state.queue.Dequeue().IsEmpty //Is integer, at least one item: dequeue unless only one item, then emit either way
? (state.queue, state.queue.Peek(), true, true)
: (state.queue.Dequeue(), state.queue.Peek(), false, true)
: state.isFakeEmptyState //Is new string, just add to queue, don't emit
? (state.queue.Dequeue().Enqueue(dItem.Type2Item), null, false, false)
: (state.queue.Enqueue(dItem.Type2Item), (string)null, false, false)
)
.Where(t => t.emit)
.Select(t => t.item);
Это можно проверить следующим образом:
var observer = scheduler.CreateObserver<string>();
result.Subscribe(observer);
scheduler.Start();
observer.Messages.Dump(); //Linqpad. Can replace with Console.Writeline loop.
Обновление : Я немного подумал об этоми я думаю, что имеет смысл бросить некоторых операторов вокруг функциональности Дискриминационного союза.Таким образом, вам не нужно явно иметь дело с типом:
public static class DUnionExtensions
{
public class DUnion<T1, T2>
{
public DUnion(T1 t1)
{
Type1Item = t1;
Type2Item = default(T2);
IsType1 = true;
}
public DUnion(T2 t2)
{
Type2Item = t2;
Type1Item = default(T1);
IsType1 = false;
}
public bool IsType1 { get; }
public bool IsType2 => !IsType1;
public T1 Type1Item { get; }
public T2 Type2Item { get; }
}
public static IObservable<DUnion<T1, T2>> Union<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
{
return a.Select(x => new DUnion<T1, T2>(x))
.Merge(b.Select(x => new DUnion<T1, T2>(x)));
}
public static IObservable<TState> ScanUnion<T1, T2, TState>(this IObservable<DUnion<T1, T2>> source,
TState initialState,
Func<TState, T1, TState> type1Handler,
Func<TState, T2, TState> type2Handler)
{
return source.Scan(initialState, (state, u) => u.IsType1
? type1Handler(state, u.Type1Item)
: type2Handler(state, u.Type2Item)
);
}
}
С этими методами расширения решение меняется на это, что, я думаю, читается лучше:
var result = input1
.Union(input2)
.ScanUnion((queue: ImmutableQueue<string>.Empty, item: (string)null, isFakeEmptyState: false, emit: false),
(state, _) => state.queue.IsEmpty
? (state.queue, null, false, false) //empty queue, so don't emit item
: state.queue.Dequeue().IsEmpty //At least one item: dequeue unless only one item, then emit either way
? (state.queue, state.queue.Peek(), true, true) //maintain last item, enter Fake-EmptyState
: (state.queue.Dequeue(), state.queue.Peek(), false, true),
(state, s) => state.isFakeEmptyState
? (state.queue.Dequeue().Enqueue(s), null, false, false)
: (state.queue.Enqueue(s), (string)null, false, false)
)
.Where(t => t.emit)
.Select(t => t.item);
Еслиу вас возникли проблемы с именованным синтаксисом кортежей, тогда вы можете использовать старые кортежи:
var result = input1
.Union(input2)
.ScanUnion(Tuple.Create(ImmutableQueue<string>.Empty, (string)null, false, false),
(state, _) => state.Item1.IsEmpty
? Tuple.Create(state.Item1, (string)null, false, false) //empty queue, so don't emit item
: state.Item1.Dequeue().IsEmpty //At least one item: dequeue unless only one item, then emit either way
? Tuple.Create(state.Item1, state.Item1.Peek(), true, true) //maintain last item, enter Fake-EmptyState
: Tuple.Create(state.Item1.Dequeue(), state.Item1.Peek(), false, true),
(state, s) => state.Item3
? Tuple.Create(state.Item1.Dequeue().Enqueue(s), (string)null, false, false)
: Tuple.Create(state.Item1.Enqueue(s), (string)null, false, false)
)
.Where(t => t.Item4)
.Select(t => t.Item2);