Я создаю класс диспетчера, который сам по себе является долгосрочным заданием, которое может быть отменено пользователем в любое время. Эта задача опрашивает базу данных, чтобы выяснить, нужно ли выполнить какую-либо работу, и запустит до X [5] # дочерних задач.
Насколько я могу судить - он отлично работает, но у меня есть несколько вопросов / опасений по поводу кода. Более или менее - поскольку я не смог найти другой пример этого - я делаю это правильно? Есть ли вещи, которые я мог бы улучшить?
- Я использую ConcurrentDictionary для отслеживания запущенных дочерних задач. В этом словаре хранится обрабатываемый RequestKey и ресурс CancellationTokenSource для этой задачи.
В: Это лучший способ сделать это? В StartDownloadProcess (это дочерняя задача) я создаю CancellationTokenSource и добавляю его в словарь, затем запускаю задачу. Я добавил к нему продолжение, которое затем удаляет элемент из словаря после завершения обработки, чтобы он не вызывался в методе Cancel.
В дочерней задаче я передаю токен отмены методу, который фактически выполняет эту работу. Затем этот процесс проверит, нужно ли прерывать его, периодически проверяя этот токен. Это правильно?
В методе Cancel - я создаю копию ключей в словаре, перебираю ее, пытаюсь получить доступ и удалить элемент из словаря и выдаю запрос на отмену.
В: Это лучший способ сделать это? Нужно ли ждать, чтобы увидеть, если задача на самом деле отменена? Могу ли я?
Q: Должен ли я избавляться от CTS?
- Я делаю Thread.Sleep в основной задаче .. хорошо / плохо? Должен ли я использовать SpinWait вместо этого? Есть ли другой способ / лучший способ заставить основной опросщик перейти в спящий режим и перезапустить его через определенные промежутки времени?
Примечание: в StartDownloadProcess я использую while (true) для цикла до завершения задачи или отменяется для повторения до j> requestKey. В реальном коде не было бы цикла while. Он просто запустит новую задачу и запустит сам процесс загрузки.
-
/// <summary>
/// Primary dispatcher token source
/// </summary>
CancellationTokenSource primaryTokenSource;
/// <summary>
/// A collection of Worker Tokens which can be used to cancel worker tasks and keep track of how many
/// there are.
/// </summary>
ConcurrentDictionary<int, CancellationTokenSource> workerTokens = new ConcurrentDictionary<int, CancellationTokenSource>();
/// <summary>
/// Runs this instance.
/// </summary>
public void Run() {
// Only one dispatcher can be running
if (IsRunning)
return;
// Create a new token source
primaryTokenSource = new CancellationTokenSource();
// Create the cancellation token to pass into the Task
CancellationToken token = primaryTokenSource.Token;
// Set flag on
IsRunning = true;
// Fire off the dispatcher
Task.Factory.StartNew(
() => {
// Loop forever
while (true) {
// If there are more than 5 threads running, don't add a new one
if (workerTokens.Count < 5) {
// Check to see if we've been cancelled
if (token.IsCancellationRequested)
return;
// Check to see if there are pending requests
int? requestKey = null;
// Query database (removed)
requestKey = new Random().Next(1550);
// If we got a request, start processing it
if (requestKey != null) {
// Check to see if we've been cancelled before running the child task
if (token.IsCancellationRequested)
return;
// Start the child downloader task
StartDownloadProcess(requestKey.Value);
}
} else {
// Do nothing, we've exceeded our max tasks
Console.WriteLine("MAX TASKS RUNNING, NOT STARTING NEW");
}
// Sleep for the alloted time
Thread.Sleep(Properties.Settings.Default.PollingInterval);
}
}, token)
// Turn running flag off
.ContinueWith((t) => IsRunning = false)
// Notify that we've finished
.ContinueWith(OnDispatcherStopped);
}
/// <summary>
/// Starts the download process.
/// </summary>
/// <param name="requestKey">The request key.</param>
private void StartDownloadProcess(int requestKey) {
CancellationTokenSource workerTokenSource = new CancellationTokenSource();
CancellationToken token = workerTokenSource.Token;
// Add the token source to the queue
workerTokens.GetOrAdd(requestKey, workerTokenSource);
// Start the child downloader task
Task.Factory.StartNew(
() => {
int j = 0;
while (true) {
if (token.IsCancellationRequested) {
Console.WriteLine("Sub-Task Cancelled {0}", requestKey);
return;
}
// Create a new downloader, pass it the RequestKey and token
//var downloader = new Downloader(requestKey, token);
//downloader.Run();
// Simulate work
Thread.Sleep(250);
Console.WriteLine("SUB-Task {0} is RUNNING! - #{1}", requestKey, j);
// Simulate - automatically end task when j > requestkey
if (j++ > requestKey) {
Console.WriteLine("SUB TASK {0} IS ENDING!", requestKey);
return;
}
}
},
token
).ContinueWith((t) => {
// If we ended naturally, the cancellationtoken will need to be removed from the dictionary
CancellationTokenSource source = null;
workerTokens.TryRemove(requestKey, out source);
});
}
/// <summary>
/// Cancels this instance.
/// </summary>
public void Cancel() {
// Cancel the primary task first so new new child tasks are created
if (primaryTokenSource != null)
primaryTokenSource.Cancel();
// Iterate over running cancellation sources and terminate them
foreach (var item in workerTokens.Keys.ToList()) {
CancellationTokenSource source = null;
if (workerTokens.TryRemove(item, out source)) {
source.Cancel();
}
}
}
Кроме того, не показано в примере выше ... несколько событий также могут быть вызваны с помощью задач ... все эти события выглядят следующим образом:
public event EventHandler DispatcherStarted;
private void OnDispatcherStarted() {
EventHandler handler = DispatcherStarted;
if (handler != null)
Task.Factory.StartNew(() => handler(this, EventArgs.Empty), CancellationToken.None, TaskCreationOptions.None, taskScheduler).Wait();
}
В методе Run () - в разные моменты он вызывал бы OnDispatcher * (); поднять события, чтобы звонящий мог подписаться и получать уведомления. Те задачи, которые создает событие, будут выполняться в основном потоке.
- Дополнительный вопрос: я пытался сделать диспетчер универсальным и передать объект "poller", который проверяет базу данных ... и, в случае успеха, создает дочернюю задачу и передает необходимые ей параметры. Я столкнулся с некоторыми проблемами, такими как ... как передавать данные, какие объекты передавать .. Интерфейсы / Классы / Func <,,,> / Action <> и т. Д. Как я могу превратить это в универсальный диспетчер / поллер, который работает A, который возвращает параметры (я думал словарь), который затем создает дочернюю задачу B, которая использует эти параметры и поддерживает отмену и уведомление о событии?