Как выполнить функции в теме синхронно? - PullRequest
0 голосов
/ 13 мая 2019

Проблема, которую я хочу решить, заключается в том, что я хочу запустить асинхронное действие из нескольких потоков, но разрешено только одно выполнение действия за раз.

Действие заключается в связи с аппаратным устройством, и устройство может обрабатывать только один запрос за один раз.

Одной из моих идей было синхронизировать это с System.Reactive.Subjects.Subject. Несколько потоков могут вызывать OnNext, и субъект должен был выполнить один запрос за другим. Я написал этот (вероятно, очень наивный) код:

static void Main(string[] args)
{
    var source = new System.Reactive.Subjects.Subject<Func<Task<int>>>();

    source
        // from http://code.fitness/post/2016/11/rx-selectmany-deep-dive.html
        .SelectMany(async x => await x.Invoke()) 
        .Subscribe(result => Console.WriteLine($"Work of index {result} completed"));

    var noOfThreads = 3;
    for (var i = 0; i < noOfThreads; i++)
    {
        var i1 = i;
        var t = new Thread(() => source.OnNext(() => doWork(i1)));
        t.Start();
    }

    Console.ReadKey();
}

static async Task<int> doWork(int index)
{
    Console.WriteLine($"Start work {index}");
    await Task.Delay(1000);
    Console.WriteLine($"Stop work {index}");
    return index;
}

Моя надежда была такой:

Start work 2
Stop work 2
Work of index 2 completed
Start work 0
Stop work 0
Work of index 0 completed

Вместо этого я получаю:

Start work 0
Start work 1
Start work 2
Stop work 1
Stop work 0
Work of index 1 completed
Work of index 0 completed
Stop work 2
Work of index 2 completed

Это показывает, что все действия начинаются с самого начала, и не нужно ждать завершения других. Интересно, является ли Reactive правильным способом сделать это, или есть какой-то еще умный способ, которым я могу выполнить свою задачу.

Редактировать : чтобы получить дополнительную справочную информацию, зачем мне это нужно: приложение связывается с устройством. Это устройство имеет последовательный интерфейс и может обрабатывать только одну команду за раз. Так что у меня есть тема, которая постоянно получает обновления статуса, такие как:

while (true)
{
    ReadPosition();
    ReadTempereatures();
    ReadErrors();
}

Тогда есть пользовательский интерфейс, где пользователи могут инициировать некоторые действия на устройстве. Мое текущее решение - это очередь, в которой я ставлю свои команды в очередь. Это работает, но мне было интересно, будет ли работать подход на основе событий.

1 Ответ

4 голосов
/ 13 мая 2019

Вы смешиваете Rx, Задачи и потоки. Не удивительно, что это сходит с рельсов. Выберите один подход - Rx - лучший, ИМХО, - и у вас все будет хорошо.

Достаточно ли этого:

static void Main(string[] args)
{
    var source = new Subject<Func<int>>();

    source
        .Synchronize()
        .Select(x => x())
        .Subscribe(result => Console.WriteLine($"Work of index {result} completed"));

    var noOfThreads = 3;
    for (var i = 0; i < noOfThreads; i++)
    {
        var i1 = i;
        var t = new Thread(() => source.OnNext(() => doWork(i1)));
        t.Start();
    }

    Console.ReadLine();
}

static int doWork(int index)
{
    Console.WriteLine($"Start work {index}");
    Thread.Sleep(1000);
    Console.WriteLine($"Stop work {index}");
    return index;
}

Это дает:

Start work 0
Stop work 0
Work of index 0 completed
Start work 2
Stop work 2
Work of index 2 completed
Start work 1
Stop work 1
Work of index 1 completed

Ключ должен вызвать .Synchronize(), чтобы привести все вызывающие потоки в контракт Rx.

...