Реактивный Observable.Create для асинхронного производителя без Task.Run - PullRequest
0 голосов
/ 11 октября 2018

Я хочу написать функцию, которая будет возвращать элементы асинхронно:

public static async Task<IEnumerable<T>> GetItems()
{
    while (await ShouldLoopAsync())
    {
        yield return await GetItemAsync();
    }
}

Конечно, это не будет работать.В 1 они предлагают следующее решение:

return Observable.Create<T>(
    observer => Task.Run(async () =>
        {
            while (await ShouldLoopAsync())
            {
                observer.OnNext(await getAsync());
            }
            observer.OnCompleted();
        }
    )
);

Мне не нравится принятое решение из 1 , потому что оно использует метод Task.Run внутри Observable.Create.Я хочу какое-то решение с реактивными расширениями, которые будут работать на текущем TaskScheduler без использования других потоков, кроме существующего.Я могу написать такое решение декларативным способом, используя .net события, но я не могу с Rx.

ОБНОВЛЕНИЕ

Я попытался удалить Task.Run и написать следующий тест:

[TestClass]
public class ReactiveWithTaskIEnumerable
{
    [TestMethod]
    public void ReactibeShouldProcessValuesWithoutnakoplenie()
    {
        Trace.WriteLine($"Start Thread {Thread.CurrentThread.ManagedThreadId}");
        var i = GetIntsAsync()
            .Subscribe(
            onNext: p =>
            {
                Trace.WriteLine($"onNext Thread {Thread.CurrentThread.ManagedThreadId} {p}");
            },
            onCompleted: () =>
            {
                Trace.WriteLine($"onCompleted Thread {Thread.CurrentThread.ManagedThreadId}");
            },
            onError: p =>
            {
                Trace.WriteLine($"onError Thread {Thread.CurrentThread.ManagedThreadId} {p}");
                throw p;
            }
            );
        Trace.WriteLine($"Finish Thread {Thread.CurrentThread.ManagedThreadId}");

    }

    private IObservable<int> GetIntsAsync()
    {
        return Observable.Create<int>(
            observer => async () =>
            {
                Trace.WriteLine($"Produce {Thread.CurrentThread.ManagedThreadId}");
                try
                {
                    foreach (var i in Enumerable.Range(1, 10))
                    {
                        await Task.Delay(1000);
                        observer.OnNext(1);
                    }
                }
                catch (Exception e)
                {
                    observer.OnError(e);
                }
                observer.OnCompleted();
            });
    }
}

И вывод:

Start Thread 3
Finish Thread 3

Таким образом, внутренняя задача вообще не запускается.

После этого я изменил GetIntsAsync на следующее:

    private IObservable<int> GetIntsAsync()
    {
        return Observable.Create<int>(
            observer =>
            {
                async Task Lambda()
                {
                    Trace.WriteLine($"Produce {Thread.CurrentThread.ManagedThreadId}");
                    try
                    {
                        foreach (var i in Enumerable.Range(1, 10))
                        {
                            await Task.Delay(1000);
                            observer.OnNext(1);
                        }
                    }
                    catch (Exception e)
                    {
                        observer.OnError(e);
                    }
                    observer.OnCompleted();
                }

                return Lambda();
            });
    }

И это дает следующий вывод:

Debug Trace:
Start Thread 3
Produce 3
Finish Thread 3

Он близок к выводу с Task.Run:

Трассировка отладки: Начать поток 3 Создать 9 Завершить поток3

Теперь мне нужно подождать, пока наблюдаемое не завершится

1 Ответ

0 голосов
/ 15 октября 2018

Посетите NuGetting "System.Interactive", а затем попробуйте это:

public static IEnumerable<int> GetItems()
{
    return EnumerableEx.Create<int>(async y =>
    {
        while (await ShouldLoopAsync())
        {
            await y.Return(GetItemAsync());
        }
        await y.Break();
    });
}
...