Очередь асинхронной задачи в C# - PullRequest
0 голосов
/ 02 апреля 2020

У меня есть несколько методов, которые сообщают некоторые данные в базу данных. Мы хотим вызывать все вызовы службы данных асинхронно. Эти вызовы к службе данных завершены, и поэтому мы хотим убедиться, что эти вызовы DS выполняются по порядку в любой момент времени. Первоначально я использовал asyn c await для каждого из этих методов, и каждый из вызовов выполнялся асинхронно, но мы выяснили, что они не совпадают, тогда есть место для ошибок.

Итак, я подумал, что мы должны поставить в очередь все эти асинхронные задачи и отправить их в отдельном потоке, но я хочу знать, какие у нас есть варианты? Я наткнулся на «Семафор Слим». Подойдет ли это в моем случае использования? Или какие другие варианты подойдут для моего варианта использования? Пожалуйста, ведите меня.

Итак, что у меня есть в моем коде сейчас

public static SemaphoreSlim mutex = new SemaphoreSlim(1);

//first DS call 

 public async Task SendModuleDataToDSAsync(Module parameters)
    {
        var tasks1 = new List<Task>();
        var tasks2 = new List<Task>();

        //await mutex.WaitAsync(); **//is this correct way to use SemaphoreSlim ?**
        foreach (var setting in Module.param)
        {
           Task job1 = SaveModule(setting);
           tasks1.Add(job1);
           Task job2= SaveModule(GetAdvancedData(setting));
           tasks2.Add(job2);
        }

        await Task.WhenAll(tasks1);
        await Task.WhenAll(tasks2);

        //mutex.Release(); // **is this correct?**
    }

 private async Task SaveModule(Module setting)
    {
        await Task.Run(() =>
            {
             // Invokes Calls to DS
             ... 
            });
    }

// где-то в главном потоке, вызывая второй вызов DS

  //Second DS Call
 private async Task SendInstrumentSettingsToDS(<param1>, <param2>)
 {
    //await mutex.WaitAsync();// **is this correct?**
    await Task.Run(() =>
            {
                 //TrackInstrumentInfoToDS
                 //mutex.Release();// **is this correct?**
            });
    if(param2)
    {
        await Task.Run(() =>
               {
                  //TrackParam2InstrumentInfoToDS
               });
    }
 }

enter image description here

enter image description here

Ответы [ 4 ]

3 голосов
/ 02 апреля 2020

Первоначально я использовал asyn c await для каждого из этих методов, и каждый из вызовов выполнялся асинхронно, но мы выяснили, что они не совпадают, тогда есть место для ошибок.

Итак, я подумал, что мы должны поставить все эти асинхронные задачи в очередь и отправить их в отдельном потоке, но я хочу знать, какие у нас есть варианты? Я столкнулся с 'SemaphoreSlim'.

SemaphoreSlim действительно ограничивает асинхронный код для выполнения по одному и является допустимой формой взаимного исключения, Однако, поскольку вызовы «вне последовательности» могут вызвать ошибки, то SemaphoreSlim является , а не подходящим решением, поскольку оно не гарантирует FIFO.

В более общем смысле примитив синхронизации отсутствует гарантирует FIFO, потому что это может вызвать проблемы из-за побочных эффектов, таких как конвои блокировки. С другой стороны, естественно, что структуры данных должны быть строго FIFO.

Таким образом, вам нужно использовать собственную очередь FIFO, а не неявную очередь выполнения. Channels - это хорошая, производительная, asyn c -совместимая очередь, но поскольку вы работаете на более старой версии C # /. NET, BlockingCollection<T> будет работать:

public sealed class ExecutionQueue
{
  private readonly BlockingCollection<Func<Task>> _queue = new BlockingCollection<Func<Task>>();

  public ExecutionQueue() => Complete = Task.Run(() => ProcessQueueAsync());

  public Task Completion { get; }

  public void Complete() => _queue.CompleteAdding();

  private async Task ProcessQueueAsync()
  {
    foreach (var value in _queue.GetConsumingEnumerable())
      await value();
  }
}

Единственная хитрость часть с этой настройкой, как поставить в очередь работу. С точки зрения кода, ставящего в очередь работу, они хотят знать, когда лямбда выполняется, а не когда лямбда находится в очереди. С точки зрения метода очереди (который я называю Run), метод должен завершить свою возвращенную задачу только после выполнения лямбды. Таким образом, вы можете написать метод очереди примерно так:

public Task Run(Func<Task> lambda)
{
  var tcs = new TaskCompletionSource<object>();
  _queue.Add(async () =>
  {
    // Execute the lambda and propagate the results to the Task returned from Run
    try
    {
      await lambda();
      tcs.TrySetResult(null);
    }
    catch (OperationCanceledException ex)
    {
      tcs.TrySetCanceled(ex.CancellationToken);
    }
    catch (Exception ex)
    {
      tcs.TrySetException(ex);
    }
  });
  return tcs.Task;
}

Этот метод не так совершенен, как мог бы быть. Если задача завершается с более чем одним исключением (это нормально для параллельного кода), сохраняется только первое (это нормально для асинхронного кода c). Там также крайний случай вокруг обработки OperationCanceledException. Но этот код достаточно хорош для большинства случаев.

Теперь вы можете использовать его так:

public static ExecutionQueue _queue = new ExecutionQueue();

public async Task SendModuleDataToDSAsync(Module parameters)
{
  var tasks1 = new List<Task>();
  var tasks2 = new List<Task>();

  foreach (var setting in Module.param)
  {
    Task job1 = _queue.Run(() => SaveModule(setting));
    tasks1.Add(job1);
    Task job2 = _queue.Run(() => SaveModule(GetAdvancedData(setting)));
    tasks2.Add(job2);
  }

  await Task.WhenAll(tasks1);
  await Task.WhenAll(tasks2);
}
1 голос
/ 02 апреля 2020

Имейте в виду, что ваше первое решение постановки всех задач в списки не гарантирует, что задачи выполняются одна за другой. Все они работают параллельно, потому что их не ждут, пока не начнутся следующие задачи.

Так что да, вы должны использовать SemapohoreSlim, чтобы использовать асин c блокировку и ожидание. Простая реализация может быть:

private readonly SemaphoreSlim _syncRoot = new SemaphoreSlim(1);

public async Task SendModuleDataToDSAsync(Module parameters)
{
    await this._syncRoot.WaitAsync();
    try
    {
        foreach (var setting in Module.param)
        {
           await SaveModule(setting);
           await SaveModule(GetAdvancedData(setting));
        }
    }
    finally
    {
        this._syncRoot.Release();
    }
}

Если вы можете использовать Nito.AsyncEx , код можно упростить до:

public async Task SendModuleDataToDSAsync(Module parameters)
{
    using var lockHandle = await this._syncRoot.LockAsync();

    foreach (var setting in Module.param)
    {
       await SaveModule(setting);
       await SaveModule(GetAdvancedData(setting));
    }
}
0 голосов
/ 02 апреля 2020

Исходя из вашего комментария под ответом Alexeis, ваш подход с SemaphoreSlim верен.

Предполагается, что методы SendInstrumentSettingsToDS и SendModuleDataToDSAsync являются членами одного класса. Вам просто нужно иметь переменную экземпляра для SemaphoreSlim, а затем в начале каждого метода, который нуждается в вызове синхронизации await lock.WaitAsync() и вызове lock.Release() в блоке finally.

public async Task SendModuleDataToDSAsync(Module parameters)
{
    await lock.WaitAsync();
    try
    {
        ...
    }
    finally
    {
        lock.Release();
    }
}

private async Task SendInstrumentSettingsToDS(<param1>, <param2>)
{
    await lock.WaitAsync();
    try
    {
        ...
    }
    finally
    {
        lock.Release();
    }
}

, и важно, чтобы вызов lock.Release() находится в блоке finally, поэтому, если где-то в коде блока try создается исключение, семафор освобождается.

0 голосов
/ 02 апреля 2020

Один из вариантов - поставить в очередь операции, которые будут создавать задачи, а не ставить в очередь уже запущенные задачи, как делает код в вопросе.

Псевдокод без блокировки:

 Queue<Func<Task>> tasksQueue = new Queue<Func<Task>>();

 async Task RunAllTasks()
 {
      while (tasksQueue.Count > 0)
      { 
           var taskCreator = tasksQueue.Dequeu(); // get creator 
           var task = taskCreator(); // staring one task at a time here
           await task; // wait till task completes
      }
  }

  // note that declaring createSaveModuleTask does not  
  // start SaveModule task - it will only happen after this func is invoked
  // inside RunAllTasks
  Func<Task> createSaveModuleTask = () => SaveModule(setting);

  tasksQueue.Add(createSaveModuleTask);
  tasksQueue.Add(() => SaveModule(GetAdvancedData(setting)));
  // no DB operations started at this point

  // this will start tasks from the queue one by one.
  await RunAllTasks();

Использование ConcurrentQueue было бы, вероятно, правильно в реальном коде. Вам также необходимо знать общее количество ожидаемых операций, которые нужно остановить, когда все запускаются и ожидаются одна за другой.

...