Как использовать SelectMany для асинхронной логики в последовательности Observable.FromEventPattern? - PullRequest
0 голосов
/ 12 июня 2019

У меня есть класс, который предоставляет следующую наблюдаемую последовательность:

internal IObservable<TaskDoneEventArgs> WhenTaskDone => Observable
    .FromEventPattern<TaskDoneEventHandler, TaskDoneEventArgs>(
            handler => NiTask.Done += handler,
            handler => NiTask.Done -= handler)
    .Select(x => x.EventArgs);

Я хотел бы выполнить некоторую асинхронную логику TPL всякий раз, когда из последовательности наблюдается новый элемент.Насколько я понимаю, SelectMany() - хороший способ справиться с асинхронной логикой.Тем не менее у меня возникают проблемы с правильным синтаксисом.

Моя попытка сделать следующее, но она не будет компилироваться:

_output.WhenTaskDone
    .SelectMany(async _ => await StopDelivery())
    .Subscribe(_ => Debug.WriteLine("Delivery stopped"));

Я получаю следующую ошибку, связанную с SelectMany():

  Error CS0411: The type arguments for method 'Observable.SelectMany<TSource, TOther>(IObservable<TSource>, IObservable<TOther>)' cannot be inferred from the usage. Try specifying the type arguments explicitly. (85, 18)

Подпись функции StopDelivery() выглядит следующим образом:

internal Task StopDelivery()

Что я здесь упускаю и что делаю не так?

1 Ответ

1 голос
/ 13 июня 2019

Если вы хотите вставить одиночное действие, основанное на методе, который возвращает Task, в конвейер, чтобы Task завершился до того, как исходное значение будет возвращено подписчику, тогда вам нужно сделать этот видиз этого:

.SelectMany(_ => Observable.FromAsync(() => StopDelivery()), (x, y) => x)

Вот полный рабочий пример:

void Main()
{
    WhenTaskDone
        .SelectMany(_ => Observable.FromAsync(() => StopDelivery()), (x, y) => x)
        .Subscribe(_ => Debug.WriteLine("Delivery stopped"));

    NiTask.OnDone();
}

private NiTaskClass NiTask = new NiTaskClass();

internal IObservable<TaskDoneEventArgs> WhenTaskDone =>
    Observable
        .FromEventPattern<TaskDoneEventHandler, TaskDoneEventArgs>(
            handler => NiTask.Done += handler,
            handler => NiTask.Done -= handler)
        .Select(x => x.EventArgs);


internal Task StopDelivery() => Task.Run(() => Console.WriteLine("StopDelivery"));

public delegate void TaskDoneEventHandler(object sender, TaskDoneEventArgs e);

public class TaskDoneEventArgs : EventArgs { }

public class NiTaskClass
{
    public event TaskDoneEventHandler Done;
    public void OnDone()
    {
        this.Done?.Invoke(this, new TaskDoneEventArgs());
    }
}

Это было собрано в LINQPad - после добавления System.Reactive через NuGet вы можетесделайте копию и вставьте, и это должно работать нормально.

...