В моем приложении я получаю события из шины сообщений (rabbit mq, но на самом деле это не имеет значения). Обработка этих событий занимает довольно много времени, и события генерируются пачками. Одновременно следует обрабатывать только одно событие. Для этого я использую Rx, чтобы сериализовать события и выполнить обработку асинхронно, чтобы не блокировать производителя.
Вот (упрощенный) пример кода:
static void Main(string[] args)
{
var input = Observable.Interval(TimeSpan.FromMilliseconds(500));
var subscription = input.Select(i => Observable.FromAsync(ct => Process(ct, i)))
.Concat()
.Subscribe();
Console.ReadLine();
subscription.Dispose();
// Wait until finished?
Console.WriteLine("Completed");
}
private static async Task Process(CancellationToken cts, long i)
{
Console.WriteLine($"Processing {i} ...");
await Task.Delay(1000).ConfigureAwait(false);
Console.WriteLine($"Finished processing {i}");
}
Пример приложения удаляет подписку, а затем приложение завершается. Но в этом примере приложение завершает работу, в то время как последнее событие, полученное перед подпиской, все еще обрабатывается.
Вопрос: Как лучше всего дождаться обработки последнего события после удаления подписки? Я все еще пытаюсь понять суть асинхронного c Rx. Я предполагаю, что есть довольно простой способ сделать это, которого я сейчас просто не вижу.