Как правильно убирать после отмены долго выполняющейся задачи - PullRequest
6 голосов
/ 09 марта 2011

Я создал класс, целью которого является абстрагирование контроля одновременного доступа к очереди.

Класс предназначен для создания экземпляра в одном потоке, записи в несколько потоков и последующего чтения из следующего отдельного потока.

У меня есть одна длительная задача, сгенерированная внутри класса, которая выполнит цикл блокировки и запустит событие, если элемент успешно удален из очереди.

У меня такой вопрос: является ли моя реализация отмены долгосрочной задачи И последующей очистки / сброса корректным использованием объекта CancellationTokenSource?

В идеале я бы хотел, чтобы активный объект можно было останавливать и перезапускать при сохранении доступности для добавления в очередь.

В качестве основы я использовал статью Питера Бромберга: Очередь для производителя / потребителя и BlockingCollection в C # 4.0

Код ниже:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Test
{
    public delegate void DeliverNextQueuedItemHandler<T>(T item);

public sealed class SOQueueManagerT<T> 
{

    ConcurrentQueue<T> _multiQueue;
    BlockingCollection<T> _queue;
    CancellationTokenSource _canceller;
    Task _listener = null;

    public event DeliverNextQueuedItemHandler<T> OnNextItem;

    public bool IsRunning { get; private set; }
    public int QueueSize
    {
        get
        {
            if (_queue != null)
                return _queue.Count;
            return -1;
        }
    }

    public CancellationTokenSource CancellationTokenSource
    {
        get
        {
            if (_canceller == null)
                _canceller = new CancellationTokenSource();

            return _canceller;
        }
    }

    public SOQueueManagerT()
    {
        _multiQueue = new ConcurrentQueue<T>();
        _queue = new BlockingCollection<T>(_multiQueue);

        IsRunning = false;
    }

    public void Start()
    {
        if (_listener == null)
        {


            IsRunning = true;

            _listener = Task.Factory.StartNew(() =>
            {

                while (!CancellationTokenSource.Token.IsCancellationRequested)
                {
                    T item;
                    if (_queue.TryTake(out item, 100))
                    {
                        if (OnNextItem != null)
                        {

                            OnNextItem(item);
                        }
                    }

                }
            },
            CancellationTokenSource.Token,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
        }
    }

    public void Stop()
    {
        if (_listener != null)
        {
            CancellationTokenSource.Cancel();
            CleanUp();
        }
    }

    public void Add(T item)
    {
        _queue.Add(item);
    }

    private void CleanUp()
    {
        _listener.Wait(2000);
        if (_listener.IsCompleted)
        {
            IsRunning = false;
            _listener = null;
            _canceller = null;
        }
    }


 }
}

UPDATE Вот что я сделал в конце. Это не идеально, но до сих пор делает работу.

public sealed class TaskQueueManager<T> 
{
    ConcurrentQueue<T> _multiQueue;
    BlockingCollection<T> _queue;
    CancellationTokenSource _canceller;
    Task _listener = null;

    public event DeliverNextQueuedItemHandler<T> OnNextItem;

    public bool IsRunning
    {
        get
        {
            if (_listener == null)
                return false;
            else if (_listener.Status == TaskStatus.Running ||
                _listener.Status == TaskStatus.Created ||
                _listener.Status == TaskStatus.WaitingForActivation ||
                _listener.Status == TaskStatus.WaitingToRun ||
                _listener.IsCanceled)
                return true;
            else
                return false;
        }
    }
    public int QueueSize
    {
        get
        {
            if (_queue != null)
                return _queue.Count;
            return -1;
        }
    }

    public TaskQueueManager()
    {
        _multiQueue = new ConcurrentQueue<T>();
        _queue = new BlockingCollection<T>(_multiQueue);
    }

    public void Start()
    {
        if (_listener == null)
        {
            _canceller = new CancellationTokenSource();

            _listener = Task.Factory.StartNew(() =>
            {
                while (!_canceller.Token.IsCancellationRequested)
                {
                    T item;
                    if (_queue.TryTake(out item, 100))
                    {
                        if (OnNextItem != null)
                        {
                            try
                            {
                                OnNextItem(item);
                            }
                            catch (Exception e)
                            {
                                //log or call an event
                            }
                        }
                    }
                }
            },
            _canceller.Token,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
        }
    }

    public void Stop()
    {
        if (_listener != null)
        {
            _canceller.Cancel();

            if (_listener.IsCanceled && !_listener.IsCompleted)
                _listener.Wait();

            _listener = null;
            _canceller = null;
        }
    }

    public void Add(T item)
    {
        if (item != null)
        {
            _queue.Add(item);
        }
        else
        {
            throw new ArgumentNullException("TaskQueueManager<" + typeof(T).Name + ">.Add item is null");
        }
    }
}

1 Ответ

1 голос
/ 10 марта 2011

Тщательное программирование - единственное, что может его сократить.Даже если вы отмените операцию, у вас может быть ожидающая операция, которая не завершается в течение модного периода времени.Это вполне может быть блокирующая операция, которая заблокирована.В этом случае ваша программа фактически не прекратит работу.

Например, если я вызываю ваш метод CleanUp несколько раз или без вызова Start сначала, у меня возникает ощущение, что это может привести к сбою.

Время ожидания 2 секунды во время очистки кажется более произвольным, чемзапланировано, и я на самом деле зашел бы так далеко, чтобы обеспечить правильное завершение работы или сбой / зависание (вы никогда не захотите оставлять параллельные вещи в неизвестном состоянии).

Кроме того, IsRunning явно установленне выводится из состояния объекта.

Для вдохновения я хотел бы, чтобы вы посмотрели на похожий класс, который я недавно написал, это шаблон производителя / потребителя, который работает в фоновом потоке.Вы можете найти этот исходный код на CodePlex .Хотя это было разработано для решения очень специфической проблемы.

Здесь отмена решается путем запроса определенного типа, который распознается только рабочим потоком и, таким образом, начинает закрываться.Это также гарантирует, что я никогда не отменю ожидающую работу, учитываются только целые единицы работы.

Чтобы немного улучшить эту ситуацию, у вас может быть отдельный таймер для текущей работы и отмена или откат незавершенной работы, если она отменена.Теперь для реализации поведения, подобного транзакции транзакции , потребуется несколько проб и ошибок, потому что вам нужно посмотреть на каждый возможный случай и спросить себя, что произойдет, если здесь произойдет сбой программы?В идеале все эти пути кода приводят к восстанавливаемому или известному состоянию, из которого вы можете возобновить свою работу.Но, как я думаю, вы уже догадались, для этого потребуется тщательное программирование и много испытаний.

...