C# Очередь asyn c Задача с использованием BlockingCollection и очередь обработки только после того, как значение, возвращенное для предыдущей задачи получения в очереди - PullRequest
0 голосов
/ 09 апреля 2020

Недавно у меня было требование ставить в очередь асинхронные задачи c, и я познакомился с BlockingCollection по этой ссылке Очередь асинхронной задачи в C# Это сработало, и у меня появилось небольшое изменение в требовании и нужно ваше руководство. Я использую коллекцию BlockingCollection, как в ответе @Stephen Cleary

Это коллекция BlockingCollection по этой ссылке

public sealed class ExecutionQueue
{
  //private readonly BlockingCollection<Func<Task>> _queue = new BlockingCollection<Func<Task>>();//commented this
  private readonly BlockingCollection<Task> _queue = new BlockingCollection<Task>();

  public ExecutionQueue() => Complete = Task.Run(() => ProcessQueueAsync());

  public Task Completion { get; }

  public void Complete() => _queue.CompleteAdding();

  private async Task ProcessQueueAsync()
  {
    foreach (var value in _queue.GetConsumingEnumerable())
      await value();
  }
}

//public Task Run(Func<Task> lambda)
public Task Run(<Task> lambda)
{
  var tcs = new TaskCompletionSource<object>();
  _queue.Add(lamda);
  return tcs.Task;
}
  1. Мне нужно поставить в очередь определенные задачи DataBase, которые находятся в обычном пустоте метод. Я не могу изменить подпись этого метода. Как мне их сделать?
 public static ExecutionQueue taskQueue = new ExecutionQueue();

 private void SaveValesToDB(...)
 {
    var item = GetID(...);
    ...
    taskQueue.Run(Task.Run(() =>
    {
       DBInstance.DBSaveValue1(...); // is it correct to wrap with Task.Run and add to queue? it should be queued and run asynchronously
     });
    ...
 }
Мы сохраняем и извлекаем данные из БД. Итак, когда мы ставим в очередь вызов БД, который возвращает что-то вроде геттера, мы хотим убедиться, что до тех пор, пока мы не получим возвращаемое значение, мы не обработаем другие элементы, находящиеся в очереди.
private void SaveValesToDB(...)
{
 ...
 taskQueue.Run(Task.Run(() =>
 {
    DBInstance.DBSaveValue1(...); // is this correct? it should be queued and run asynchronously
  });
 ...
 taskQueue.Run(Task.Run(() =>
 {
    var result1 = DBInstance.DBGetValue2(...); // should be queued and run asynchronously; 
    LogData(result1);// not a DB call but believe it should be wrapped in here for the result1, correct?
 });

 /*so in above Task.Run,  i want to ensure that until i receive result1 
 i don't process other items in the queue even 
 if they are added. how can i do that ? 
 The main thread should continue. */
 ...
 var result 2 = DBInstance.DBGetValue3(...); // should be queued and run asynchronously

 UpdateAdvancedLod(result1 +" "+result2);// here, should i block main thread until i get result1 ?
}

Как обрабатывать ошибки?

Пожалуйста, ведите меня.

Отредактировано:

if using Func<Task> in public Task Run(Func<Task> lambda) then is the below correct?

taskQueue.Run(async () =>
                {
                    await Task.Run(() =>
                    {
                        DBInstance.DBSaveValue1(...);//is this correct
                    });
                }
                );

1 Ответ

0 голосов
/ 09 апреля 2020

Вы можете добавить этот метод к классу ExecutionQueue Стивена Клири:

public Task Run(Action action)
{
    return Run(() => Task.Run(action));
}

Это перегрузка существующего метода public Task Run(Func<Task> lambda). Он делегирует выполнение предоставленного action потоку ThreadPool.

Пример использования:

var id = GetID();
var task = taskQueue.Run(() => DBInstance.DBSaveValue1(id));
await task; // Optional

Обновление: Для распространения ошибки уведомляя основной поток, вы можете улучшить класс ExecutionQueue с помощью события Error, которое будет вызываться в захваченном контексте (захваченном во время создания экземпляра).

private readonly SynchronizationContext _capturedContext;

public event EventHandler<Exception> Error;

public ExecutionQueue() // Constructor
{
    _capturedContext = SynchronizationContext.Current ?? new SynchronizationContext();
    Completion = Task.Run(() => ProcessQueueAsync());
}

private void OnError(Exception ex)
{
    var handler = Error; if (handler == null) return;
    _capturedContext.Post(_ => handler.Invoke(this, ex), null);
}

OnError должен вызываться изнутри блока catch (Exception ex). Это будет работать с Windows приложениями Forms и приложениями WPF, потому что их поток пользовательского интерфейса оснащен SynchronizationContext. Он не будет работать с консольным приложением, потому что там нет SynchronizationContext (событие Error будет вызвано в случайном потоке ThreadPool).

...