Правильное создание Задачи для опроса и отправки дочерних задач с отменой для каждого - PullRequest
1 голос
/ 24 августа 2011

Я создаю класс диспетчера, который сам по себе является долгосрочным заданием, которое может быть отменено пользователем в любое время. Эта задача опрашивает базу данных, чтобы выяснить, нужно ли выполнить какую-либо работу, и запустит до X [5] # дочерних задач.

Насколько я могу судить - он отлично работает, но у меня есть несколько вопросов / опасений по поводу кода. Более или менее - поскольку я не смог найти другой пример этого - я делаю это правильно? Есть ли вещи, которые я мог бы улучшить?

  1. Я использую ConcurrentDictionary для отслеживания запущенных дочерних задач. В этом словаре хранится обрабатываемый RequestKey и ресурс CancellationTokenSource для этой задачи.

В: Это лучший способ сделать это? В StartDownloadProcess (это дочерняя задача) я создаю CancellationTokenSource и добавляю его в словарь, затем запускаю задачу. Я добавил к нему продолжение, которое затем удаляет элемент из словаря после завершения обработки, чтобы он не вызывался в методе Cancel.

  1. В дочерней задаче я передаю токен отмены методу, который фактически выполняет эту работу. Затем этот процесс проверит, нужно ли прерывать его, периодически проверяя этот токен. Это правильно?

  2. В методе Cancel - я создаю копию ключей в словаре, перебираю ее, пытаюсь получить доступ и удалить элемент из словаря и выдаю запрос на отмену.

В: Это лучший способ сделать это? Нужно ли ждать, чтобы увидеть, если задача на самом деле отменена? Могу ли я?
Q: Должен ли я избавляться от CTS?

  1. Я делаю Thread.Sleep в основной задаче .. хорошо / плохо? Должен ли я использовать SpinWait вместо этого? Есть ли другой способ / лучший способ заставить основной опросщик перейти в спящий режим и перезапустить его через определенные промежутки времени?

Примечание: в StartDownloadProcess я использую while (true) для цикла до завершения задачи или отменяется для повторения до j> requestKey. В реальном коде не было бы цикла while. Он просто запустит новую задачу и запустит сам процесс загрузки.

-

/// <summary>
/// Primary dispatcher token source
/// </summary>
CancellationTokenSource primaryTokenSource;
/// <summary>
/// A collection of Worker Tokens which can be used to cancel worker tasks and keep track of how many
/// there are.
/// </summary>
ConcurrentDictionary<int, CancellationTokenSource> workerTokens = new ConcurrentDictionary<int, CancellationTokenSource>();

/// <summary>
/// Runs this instance.
/// </summary>
public void Run() {
  //  Only one dispatcher can be running
  if (IsRunning)
    return;

  //  Create a new token source
  primaryTokenSource = new CancellationTokenSource();
  //  Create the cancellation token to pass into the Task
  CancellationToken token = primaryTokenSource.Token;

  //  Set flag on
  IsRunning = true;

  //  Fire off the dispatcher
  Task.Factory.StartNew(
    () => {
      //  Loop forever
      while (true) {
        //  If there are more than 5 threads running, don't add a new one
        if (workerTokens.Count < 5) {
          //  Check to see if we've been cancelled
          if (token.IsCancellationRequested)
            return;

          //  Check to see if there are pending requests
          int? requestKey = null;

          //  Query database (removed)
          requestKey = new Random().Next(1550);

          //  If we got a request, start processing it
          if (requestKey != null) {
            //  Check to see if we've been cancelled before running the child task
            if (token.IsCancellationRequested)
              return;

            //  Start the child downloader task
            StartDownloadProcess(requestKey.Value);
          }
        } else {
          //  Do nothing, we've exceeded our max tasks
          Console.WriteLine("MAX TASKS RUNNING, NOT STARTING NEW");
        }

        //  Sleep for the alloted time
        Thread.Sleep(Properties.Settings.Default.PollingInterval);
    }
  }, token)
  //  Turn running flag off
  .ContinueWith((t) => IsRunning = false)
  //  Notify that we've finished
  .ContinueWith(OnDispatcherStopped);
}

/// <summary>
/// Starts the download process.
/// </summary>
/// <param name="requestKey">The request key.</param>
private void StartDownloadProcess(int requestKey) {
  CancellationTokenSource workerTokenSource = new CancellationTokenSource();
  CancellationToken token = workerTokenSource.Token;

  //  Add the token source to the queue
  workerTokens.GetOrAdd(requestKey, workerTokenSource);

  //  Start the child downloader task
  Task.Factory.StartNew(
    () => {
      int j = 0;
      while (true) {
        if (token.IsCancellationRequested) {
          Console.WriteLine("Sub-Task Cancelled {0}", requestKey);
          return;
        }

        //  Create a new downloader, pass it the RequestKey and token
        //var downloader = new Downloader(requestKey, token);
        //downloader.Run();

        //  Simulate work
        Thread.Sleep(250);
        Console.WriteLine("SUB-Task {0} is RUNNING! - #{1}", requestKey, j);

        //  Simulate - automatically end task when j > requestkey
        if (j++ > requestKey) {
          Console.WriteLine("SUB TASK {0} IS ENDING!", requestKey);
          return;
        }
      }
    },
    token
  ).ContinueWith((t) => {
    //  If we ended naturally, the cancellationtoken will need to be removed from the dictionary
    CancellationTokenSource source = null;
    workerTokens.TryRemove(requestKey, out source);
  });
}

/// <summary>
/// Cancels this instance.
/// </summary>
public void Cancel() {
  //  Cancel the primary task first so new new child tasks are created
  if (primaryTokenSource != null)
    primaryTokenSource.Cancel();

  //  Iterate over running cancellation sources and terminate them
  foreach (var item in workerTokens.Keys.ToList()) {
    CancellationTokenSource source = null;
    if (workerTokens.TryRemove(item, out source)) {
      source.Cancel();
    }
  }
}

Кроме того, не показано в примере выше ... несколько событий также могут быть вызваны с помощью задач ... все эти события выглядят следующим образом:

public event EventHandler DispatcherStarted;
private void OnDispatcherStarted() {
  EventHandler handler = DispatcherStarted;
  if (handler != null) 
    Task.Factory.StartNew(() => handler(this, EventArgs.Empty), CancellationToken.None, TaskCreationOptions.None, taskScheduler).Wait();      
}

В методе Run () - в разные моменты он вызывал бы OnDispatcher * (); поднять события, чтобы звонящий мог подписаться и получать уведомления. Те задачи, которые создает событие, будут выполняться в основном потоке.

  • Дополнительный вопрос: я пытался сделать диспетчер универсальным и передать объект "poller", который проверяет базу данных ... и, в случае успеха, создает дочернюю задачу и передает необходимые ей параметры. Я столкнулся с некоторыми проблемами, такими как ... как передавать данные, какие объекты передавать .. Интерфейсы / Классы / Func <,,,> / Action <> и т. Д. Как я могу превратить это в универсальный диспетчер / поллер, который работает A, который возвращает параметры (я думал словарь), который затем создает дочернюю задачу B, которая использует эти параметры и поддерживает отмену и уведомление о событии?

1 Ответ

1 голос
/ 27 августа 2011

Я быстро посмотрел, киньте код и у меня мало комментариев:

  • Использование флага IsRunning не является потокобезопасным, несколько потоков могут считать его ложным, а затем одновременно установить для него значение true, и у вас будет более одного потока диспетчера !, чтобы избежать необходимости использовать Interlocked.CompareExchange для установки это, а также вам нужно пометить его как voaltile.
  • Я бы рекомендовал не использовать Sleep, также SpinWait здесь не поможет, вы можете использовать объект Timer, который объединяет базу данных и добавляет запросы в коллекцию BlockingCollection, из которой диспетчер получает запросы.
  • Продолжение дочерней задачи всегда будет выполняться, даже если родительская задача отменена, этого можно избежать, передав эту TaskContinuationOptions.NotOnCanceled
...