Реактивные расширения Observerable.FromAsyn c: как дождаться завершения операции asyn c - PullRequest
1 голос
/ 16 июня 2020

В моем приложении я получаю события из шины сообщений (rabbit mq, но на самом деле это не имеет значения). Обработка этих событий занимает довольно много времени, и события генерируются пачками. Одновременно следует обрабатывать только одно событие. Для этого я использую Rx, чтобы сериализовать события и выполнить обработку асинхронно, чтобы не блокировать производителя.

Вот (упрощенный) пример кода:

static void Main(string[] args)
{
   var input = Observable.Interval(TimeSpan.FromMilliseconds(500));
   var subscription = input.Select(i => Observable.FromAsync(ct => Process(ct, i)))
                           .Concat()
                           .Subscribe();

   Console.ReadLine();
   subscription.Dispose();
   // Wait until finished?
   Console.WriteLine("Completed");
}

private static async Task Process(CancellationToken cts, long i)
{
   Console.WriteLine($"Processing {i} ...");
   await Task.Delay(1000).ConfigureAwait(false);
   Console.WriteLine($"Finished processing {i}");
}

Пример приложения удаляет подписку, а затем приложение завершается. Но в этом примере приложение завершает работу, в то время как последнее событие, полученное перед подпиской, все еще обрабатывается.

Вопрос: Как лучше всего дождаться обработки последнего события после удаления подписки? Я все еще пытаюсь понять суть асинхронного c Rx. Я предполагаю, что есть довольно простой способ сделать это, которого я сейчас просто не вижу.

Ответы [ 2 ]

2 голосов
/ 16 июня 2020

Если вас устраивает быстрое и простое решение, которое применимо для таких простых случаев, как этот, вы можете просто Wait IObservable вместо того, чтобы подписываться на него. А если вы хотите получать уведомления, подобные подписке, используйте оператор Do непосредственно перед окончательным Wait:

static void Main(string[] args)
{
    var input = Observable.Interval(TimeSpan.FromMilliseconds(500));
    input.Select(i => Observable.FromAsync(ct => Process(ct, i)))
        .Concat()
        .Do(onNext: x => { }, onError: ex => { }, onCompleted: () => { })
        .Wait();
    Console.WriteLine("Completed");
}
1 голос
/ 16 июня 2020

Благодаря предложению Теодора я нашел решение, которое мне подходит:

static async Task Main(string[] args)
{
    var inputSequence = Observable.Interval(TimeSpan.FromMilliseconds(500));
    var terminate = new Subject<Unit>();
    var task = Execute(inputSequence, terminate);

    Console.ReadLine();
    terminate.OnNext(Unit.Default);
    await task.ConfigureAwait(false);
    Console.WriteLine($"Completed");
    Console.ReadLine();
}

private static async Task Process(CancellationToken cts, long i)
{
    Console.WriteLine($"Processing {i} ...");
    await Task.Delay(1000).ConfigureAwait(false);
    Console.WriteLine($"Finished processing {i}");
}

private static async Task Execute(IObservable<long> input, IObservable<Unit> terminate)
{
    await input
          .TakeUntil(terminate)
          .Select(i => Observable.FromAsync(ct => Process(ct, i)))
          .Concat();
}
...