Проблема, которую я хочу решить, заключается в том, что я хочу запустить асинхронное действие из нескольких потоков, но разрешено только одно выполнение действия за раз.
Действие заключается в связи с аппаратным устройством, и устройство может обрабатывать только один запрос за один раз.
Одной из моих идей было синхронизировать это с System.Reactive.Subjects.Subject
. Несколько потоков могут вызывать OnNext
, и субъект должен был выполнить один запрос за другим. Я написал этот (вероятно, очень наивный) код:
static void Main(string[] args)
{
var source = new System.Reactive.Subjects.Subject<Func<Task<int>>>();
source
// from http://code.fitness/post/2016/11/rx-selectmany-deep-dive.html
.SelectMany(async x => await x.Invoke())
.Subscribe(result => Console.WriteLine($"Work of index {result} completed"));
var noOfThreads = 3;
for (var i = 0; i < noOfThreads; i++)
{
var i1 = i;
var t = new Thread(() => source.OnNext(() => doWork(i1)));
t.Start();
}
Console.ReadKey();
}
static async Task<int> doWork(int index)
{
Console.WriteLine($"Start work {index}");
await Task.Delay(1000);
Console.WriteLine($"Stop work {index}");
return index;
}
Моя надежда была такой:
Start work 2
Stop work 2
Work of index 2 completed
Start work 0
Stop work 0
Work of index 0 completed
Вместо этого я получаю:
Start work 0
Start work 1
Start work 2
Stop work 1
Stop work 0
Work of index 1 completed
Work of index 0 completed
Stop work 2
Work of index 2 completed
Это показывает, что все действия начинаются с самого начала, и не нужно ждать завершения других. Интересно, является ли Reactive
правильным способом сделать это, или есть какой-то еще умный способ, которым я могу выполнить свою задачу.
Редактировать : чтобы получить дополнительную справочную информацию, зачем мне это нужно: приложение связывается с устройством. Это устройство имеет последовательный интерфейс и может обрабатывать только одну команду за раз. Так что у меня есть тема, которая постоянно получает обновления статуса, такие как:
while (true)
{
ReadPosition();
ReadTempereatures();
ReadErrors();
}
Тогда есть пользовательский интерфейс, где пользователи могут инициировать некоторые действия на устройстве.
Мое текущее решение - это очередь, в которой я ставлю свои команды в очередь. Это работает, но мне было интересно, будет ли работать подход на основе событий.