Я хочу написать функцию, которая будет возвращать элементы асинхронно:
public static async Task<IEnumerable<T>> GetItems()
{
while (await ShouldLoopAsync())
{
yield return await GetItemAsync();
}
}
Конечно, это не будет работать.В 1 они предлагают следующее решение:
return Observable.Create<T>(
observer => Task.Run(async () =>
{
while (await ShouldLoopAsync())
{
observer.OnNext(await getAsync());
}
observer.OnCompleted();
}
)
);
Мне не нравится принятое решение из 1 , потому что оно использует метод Task.Run внутри Observable.Create.Я хочу какое-то решение с реактивными расширениями, которые будут работать на текущем TaskScheduler без использования других потоков, кроме существующего.Я могу написать такое решение декларативным способом, используя .net события, но я не могу с Rx.
ОБНОВЛЕНИЕ
Я попытался удалить Task.Run
и написать следующий тест:
[TestClass]
public class ReactiveWithTaskIEnumerable
{
[TestMethod]
public void ReactibeShouldProcessValuesWithoutnakoplenie()
{
Trace.WriteLine($"Start Thread {Thread.CurrentThread.ManagedThreadId}");
var i = GetIntsAsync()
.Subscribe(
onNext: p =>
{
Trace.WriteLine($"onNext Thread {Thread.CurrentThread.ManagedThreadId} {p}");
},
onCompleted: () =>
{
Trace.WriteLine($"onCompleted Thread {Thread.CurrentThread.ManagedThreadId}");
},
onError: p =>
{
Trace.WriteLine($"onError Thread {Thread.CurrentThread.ManagedThreadId} {p}");
throw p;
}
);
Trace.WriteLine($"Finish Thread {Thread.CurrentThread.ManagedThreadId}");
}
private IObservable<int> GetIntsAsync()
{
return Observable.Create<int>(
observer => async () =>
{
Trace.WriteLine($"Produce {Thread.CurrentThread.ManagedThreadId}");
try
{
foreach (var i in Enumerable.Range(1, 10))
{
await Task.Delay(1000);
observer.OnNext(1);
}
}
catch (Exception e)
{
observer.OnError(e);
}
observer.OnCompleted();
});
}
}
И вывод:
Start Thread 3
Finish Thread 3
Таким образом, внутренняя задача вообще не запускается.
После этого я изменил GetIntsAsync
на следующее:
private IObservable<int> GetIntsAsync()
{
return Observable.Create<int>(
observer =>
{
async Task Lambda()
{
Trace.WriteLine($"Produce {Thread.CurrentThread.ManagedThreadId}");
try
{
foreach (var i in Enumerable.Range(1, 10))
{
await Task.Delay(1000);
observer.OnNext(1);
}
}
catch (Exception e)
{
observer.OnError(e);
}
observer.OnCompleted();
}
return Lambda();
});
}
И это дает следующий вывод:
Debug Trace:
Start Thread 3
Produce 3
Finish Thread 3
Он близок к выводу с Task.Run
:
Трассировка отладки: Начать поток 3 Создать 9 Завершить поток3
Теперь мне нужно подождать, пока наблюдаемое не завершится