Изящно убивает задание TPL - PullRequest
0 голосов
/ 11 ноября 2011

Я пытаюсь создать API-подобную вещь на основе набора служб REST. Существуют в основном разные операции, которые сообщают о своем прогрессе и поддерживают отмену. Я решил использовать TPL для выполнения асинхронных функций и управляемой загрузки. Вот моя самая основная операция, которая должна быть унаследована соответственно:

using FileOnlineCore.Enums;
using FileOnlineCore.EventArguments;
using FileOnlineCore.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace FileOnlineCore.Operations {

    public abstract class BaseOperation {

        #region Store
        private readonly Task _InnerTask;
        private readonly CancellationTokenSource _InnerCancelTokenSource;
        private readonly CancellationToken _InnerCancelToken;

        public Task InnerTask {
            get { return _InnerTask; }
        }

        protected CancellationTokenSource InnerCancelTokenSource {
            get { return _InnerCancelTokenSource; }
        }

        protected CancellationToken InnerCancelToken {
            get { return _InnerCancelToken; }
        }

        public Int32 TaskId {
            get { return _InnerTask != null ? _InnerTask.Id : -1; }
        }

        public OperationRunningStatus Status { get; set; }
        public OperationType TaskType { get; set; }
        public OperationProgressReportEventHandler onProgressReport;
        public OperationCompletedEventHandler onOperationCompleted;
        #endregion

        public BaseOperation() {
            _InnerCancelTokenSource = new CancellationTokenSource();
            _InnerCancelToken = _InnerCancelTokenSource.Token;
            _InnerTask = new Task(() => PerformWork(), _InnerCancelToken, TaskCreationOptions.None);
            Status = OperationRunningStatus.Idle;
        }

        public void Start() {
            _InnerTask.Start();
            Status = OperationRunningStatus.Working;
        }

        public void Cancel() {
            _InnerCancelTokenSource.Cancel();
            Status = OperationRunningStatus.Completed;
        }

        protected abstract void PerformWork();

        protected void ReportProgress(int Progress) {
            if (onProgressReport == null)
                return;

            onProgressReport(new OperationProgressEventArg { 
                Progress = Progress, 
                TaskId = TaskId
            });
        }

        protected void TaskCompleted(RemoteOperationEventArg arg) {
            if (onOperationCompleted == null)
                return;

            onOperationCompleted(arg);
            Status = OperationRunningStatus.Completed;
        }

    }

}

Как видите, я использую delegates для выполнения требований. Как и класс Thread, класс Task не может быть унаследован, поэтому объект Task фактически составлен в моем BaseOperation. Вот фактическая реализованная операция:

using FileOnlineCore.EventArguments;
using FileOnlineCore.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using FileOnlineCore.Objects;
using FileOnlineCore.Enums;

namespace FileOnlineCore.Operations {

    public class DownloadObjectOperation : BaseOperation {

        private readonly FileSystemObject _FileSystemObject;
        private readonly String _LocalFolder;

        public FileSystemObjectDownloadCompletedEventHandler onDownloadCompleted;

        public DownloadObjectOperation(FileSystemObject FileSystemObject, String LocalFolder){
            TaskType = OperationType.DownloadFile;

            _FileSystemObject = FileSystemObject;
            _LocalFolder = LocalFolder;
        }

        protected override void PerformWork() {
            try {
                ReportProgress(0);

                for (int i = 1; i <= 100; i+=5) {
                    // Check for the cancellation to be signaled
                    InnerCancelToken.ThrowIfCancellationRequested();

                    // Write out a little bit showing the progress of the task
                    //Console.WriteLine("Task {0}: {1}/100 In progress", Task.CurrentId, i + 1);
                    Thread.Sleep(50); // Simulate doing some work

                    // Report progress of the work.
                    ReportProgress(i);
                }

                // By getting here the task will RunToCompletion even if the token has been signaled.

                base.TaskCompleted(new RemoteOperationEventArg { 
                    Error = null, 
                    ResultSource = OperationResultSource.Unknown, 
                    Status = OperationExitStatus.Success, 
                    TaskId = TaskId 
                });

                this.DownloadTaskCompleted(new DownloadObjectOperationEventArg { 
                    Error = null, 
                    ResultSource = OperationResultSource.Unknown, 
                    Status = OperationExitStatus.Success, 
                    TaskId = TaskId, 
                    ObtainedFileSystemObject = null
                }, _LocalFolder, "TheFileName.Extension");
            }
            catch (OperationCanceledException exp_Canceled) {
                // Any clean up code goes here.

                base.TaskCompleted(new RemoteOperationEventArg {
                    Error = exp_Canceled,
                    ResultSource = OperationResultSource.Unknown,
                    Status = OperationExitStatus.Error,
                    TaskId = TaskId
                });

                this.DownloadTaskCompleted(new DownloadObjectOperationEventArg {
                    Error = exp_Canceled,
                    ResultSource = OperationResultSource.Unknown,
                    Status = OperationExitStatus.Error,
                    TaskId = TaskId,
                    ObtainedFileSystemObject = null
                }, _LocalFolder, "TheFileName.Extension");

                // To ensure that the calling code knows the task was cancelled
                //throw; 
            }
            catch (Exception exp) {
                // Clean up other stuff

                base.TaskCompleted(new RemoteOperationEventArg { 
                    Error = exp, 
                    ResultSource = OperationResultSource.Unknown, 
                    Status = OperationExitStatus.Error, 
                    TaskId = TaskId 
                });

                this.DownloadTaskCompleted( new DownloadObjectOperationEventArg { 
                    Error = exp, 
                    ResultSource = OperationResultSource.Unknown, 
                    Status = OperationExitStatus.Error, 
                    TaskId = TaskId 
                }, _LocalFolder, "TheFileName.Extension");

                // If the calling code also needs to know.
                //throw;
            }
        }

        protected void DownloadTaskCompleted(DownloadObjectOperationEventArg arg, String LocalFolder, String FileName) {
            if (onDownloadCompleted == null)
                return;

            onDownloadCompleted(arg, LocalFolder, FileName);
            Status = OperationRunningStatus.Completed;


        }

    }

}

Теперь вот основной класс, который предоставляет единственный метод, который, в свою очередь, выполняет всю работу по созданию объекта DownloadObjectOperation и заставляет его работать:

using FileOnlineCore.Enums;
using FileOnlineCore.Events;
using FileOnlineCore.Objects;
using FileOnlineCore.Operations;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace FileOnlineCore {

    public sealed class FileOnline {

        // Construct a task scheduler from the current SynchronizationContext (UI thread)
        private TaskScheduler OperationTaskScheduler;

        // Construct a new TaskFactory using our UI scheduler
        private TaskFactory OperationFactory;

        //Something to manage my operations
        private List<BaseOperation> OperationPool;

        //Originally here to massively cancel all operations. Epic fail so far.
        private CancellationTokenSource CancelAllTokenSource;
        private CancellationToken CancelAllToken;

        public FileOnline() {
            InitializeStore();
        }

        private void InitializeStore() {
            OperationPool = new List<BaseOperation>();      

            CancelAllTokenSource = new CancellationTokenSource();
            CancelAllToken = CancelAllTokenSource.Token;

            OperationTaskScheduler = TaskScheduler.Default;
            OperationFactory = new TaskFactory(CancelAllToken, TaskCreationOptions.PreferFairness, TaskContinuationOptions.ExecuteSynchronously, OperationTaskScheduler);

            //Low leve thrading stuff for performence optimization
            //int workerThreads, complete;
            //ThreadPool.GetMinThreads(out workerThreads, out complete);
            //Console.WriteLine("Idle worker threads: " + workerThreads);

            // Comment out this line to see the difference. With this commented out, the second iteration will be immediate
            //ThreadPool.SetMinThreads(100, complete);
        }


        public Int32 DownloadFileSystemObject(FileSystemObject FileSystemObject, String LocalFolder, Boolean Overwrite, OperationProgressReportEventHandler onOperationProgressReport, FileSystemObjectDownloadCompletedEventHandler onOperationCompleted) {

            var DownloadOp = new DownloadObjectOperation(FileSystemObject, LocalFolder) {
                onProgressReport = onOperationProgressReport,
                onDownloadCompleted = onOperationCompleted
            };

            OperationPool.Add(DownloadOp);

            OperationFactory.StartNew(() => {
                DownloadOp.Start();
            }).ContinueWith(t => {
                DownloadOp.InnerTask.Dispose(); //Exception!
                t.Dispose();
                OperationPool.Remove(DownloadOp);
                DownloadOp = null;
            });

            return DownloadOp.TaskId;
        }

        ...

        public void CancelTask(Int32 TaskId) {
            var FoundOperation = OperationPool.SingleOrDefault(where => where.TaskId == TaskId);

            if (FoundOperation != null && FoundOperation.InnerTask.Status == TaskStatus.Running && FoundOperation.Status == OperationRunningStatus.Working) {
                FoundOperation.Cancel();
            }
        }
        public void CancelAllTasks() {
            OperationPool.ForEach(Item => {
                if (Item != null && Item.InnerTask.Status == TaskStatus.Running && Item.Status == OperationRunningStatus.Working) {
                    Item.Cancel();
                }
            });
        }

    }

}

Теперь я могу просто пропустить OperationFactory (мой пользовательский TaskFactory) и запустить DownloadObjectOperation.Start() напрямую, но из того, что я видел в Интернете, TaskFatory выполняет внутреннюю балансировку нагрузки в зависимости от ThreadPool.

Если я пропущу OperationFactory, я могу легко связать Contnue с таким образом:

DownloadOp.InnerTask.ContinueWith(t => {
                t.Dispose();
                OperationPool.Remove(DownloadOp);
                DownloadOp = null;
            });

Однако, используя OperationFactory, я получаю исключение, поскольку последовательность не ожидает завершения процесса и сразу же переходит в блок ContinueWith. Теперь DownloadOp.InnerTask.Dispose(); вызывает исключение, поскольку фактическая внутренняя задача все еще выполняется.

Мне нужно изящно завершить свои задачи, так как этот API будет использоваться большим количеством людей. Если Tasks или мой BaseOperations расположены неправильно, боюсь, сервер зависнет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...