Это будет работать, хотя я не самый большой поклонник этого.
Я рассматривал это как ситуацию производителя / потребителя: один поток создает работу, а другой - выполняет.Тема producer
представляет поток, который добавляет работу.Все остальное представляет потребительскую сторону вещей.Если бы вы собрались class
, то producer
перешло бы в один класс, а все остальное в другой.
completedKeys
содержит ключи, которые выполнены, поэтому состояние для этого ключа будет выброшено.: новый элемент с этим ключом перейдет в конец строки.readyGate
представляет, когда потребитель вновь доступен для работы над следующей вещью.Объединение этого с последним из того, над чем работать, является сложной частью.WithLatestFrom
прекрасно работает, пока вы не получите пустой список..Where().FirstAsync()
прекрасно выполняет ожидающую часть.
Ключом ко всему этому является GroupByUntil
: это группирует вещи, и они естественным образом попадают в порядок, в котором ключи были добавлены первыми, то есть то, что вам нужно.Предложение Until
означает, что мы можем закрыть наблюдаемое, что приведет к тому, что новый элемент со старым ключом окажется в конце строки.DynamicCombinedLatest
превращает все эти наблюдаемые в список, который фактически является вашим состоянием.
В любом случае, вы идете:
var producer = new Subject<Item>();
var readyGate = new Subject<Unit>();
var completedKeys = new Subject<int>();
var Process = new Action<Item>(kvp =>
{
var str = $"{kvp.Key} : {kvp.Value}";
Console.WriteLine($"Start {str}");
Thread.Sleep(500); // simluate work
Console.WriteLine($"End {str}");
});
var groups = producer
.GroupByUntil(kvp => kvp.Key, kvp => kvp, go => completedKeys.Where(k => k == go.Key))
.DynamicCombineLatest();
var q = groups.Publish(_groups => readyGate
.ObserveOn(NewThreadScheduler.Default)
.WithLatestFrom(groups, (_, l) => l)
.SelectMany(l => l.Count == 0
? _groups.Where(g => g.Count > 0).FirstAsync()
: Observable.Return(l)
)
)
.Subscribe(l =>
{
var kvp = l[0];
completedKeys.OnNext(kvp.Key);
Process(kvp);
readyGate.OnNext(Unit.Default);
});
//Runner code:
producer.OnNext(new Item(1, "1-a"));
producer.OnNext(new Item(1, "1-b"));
producer.OnNext(new Item(2, "2-a"));
producer.OnNext(new Item(2, "2-b"));
readyGate.OnNext(Unit.Default);
await Task.Delay(TimeSpan.FromMilliseconds(100)); //to test if 1 gets done again and goes to the back of the line.
producer.OnNext(new Item(1, "1-c"));
И DynamicCombinedLatest
это (использует пакет nugetSystem.Collections.Immutable
):
public static IObservable<List<T>> DynamicCombineLatest<T>(this IObservable<IObservable<T>> source)
{
return source
.SelectMany((o, i) => o.Materialize().Select(notification => (observableIndex: i, notification: notification)))
.Scan((exception: (Exception)null, dict: ImmutableDictionary<int, T>.Empty), (state, t) => t.notification.Kind == NotificationKind.OnNext
? ((Exception)null, state.dict.SetItem(t.observableIndex, t.notification.Value))
: t.notification.Kind == NotificationKind.OnCompleted
? ((Exception)null, state.dict.Remove(t.observableIndex))
: (t.notification.Exception, state.dict)
)
.Select(t => t.exception == null
? Notification.CreateOnNext(t.dict)
: Notification.CreateOnError<ImmutableDictionary<int, T>>(t.exception)
)
.Dematerialize()
.Select(dict => dict.OrderBy(kvp => kvp.Key).Select(kvp => kvp.Value).ToList());
}