Надежный метод отслеживания неудачных работников с ThreadPool - PullRequest
2 голосов
/ 02 февраля 2010

Я ищу хороший метод отслеживания (подсчета), какие работники потерпели неудачу при постановке в очередь с Threadpool и использования WaitHandle.WaitAll () для завершения всех потоков.техника или есть более надежная стратегия?

1 Ответ

1 голос
/ 02 февраля 2010

Хорошо, вот подход, который вы могли бы использовать. Я инкапсулировал данные, которые мы хотим отслеживать, в класс TrackedWorkers. В этом классе есть конструктор, который позволяет вам указать, сколько рабочих будет работать. Затем рабочие запускаются с использованием LaunchWorkers, для которого требуется делегат, который ест object и возвращает bool. object представляет ввод для работника, а bool представляет успех или неудачу, в зависимости от того, * true или false - возвращаемое значение, соответственно.

Итак, в основном то, что мы делаем, у нас есть массив для отслеживания рабочего состояния. Мы запускаем работников и устанавливаем статус, соответствующий этому работнику, в зависимости от возвращаемого значения от работника. Когда работник возвращается, мы устанавливаем AutoResetEvent и WaitHandle.WaitAll для всех устанавливаемых AutoResetEvents.

Обратите внимание, что существует вложенный класс для отслеживания работы (делегата), которую должен выполнять рабочий, ввода для этой работы и ID, используемого для установки статуса AutoResetEvent, соответствующего этому потоку.

Обратите особое внимание на то, что, как только работа будет завершена, мы не будем упоминать ни рабочего делегата func, ни input. Это важно, чтобы мы случайно не предотвратили сбор мусора.

Существуют методы получения статуса определенного работника, а также все индексы работников, которые преуспели, и все индексы работников, которые потерпели неудачу.

Последнее замечание: я не считаю этот код готовым. Это всего лишь набросок подхода, который я бы выбрал. Вы должны позаботиться о том, чтобы добавить тестирование, обработку исключений и другие подобные детали.

class TrackedWorkers {
    class WorkerState {
        public object Input { get; private set; }
        public int ID { get; private set; }
        public Func<object, bool> Func { get; private set; }
        public WorkerState(Func<object, bool> func, object input, int id) {
            Func = func;
            Input = input;
            ID = id;
        }
    }

    AutoResetEvent[] events;
    bool[] statuses;
    bool _workComplete;
    int _number;

    public TrackedWorkers(int number) {
        if (number <= 0 || number > 64) {
            throw new ArgumentOutOfRangeException(
                "number",
                "number must be positive and at most 64"
            );
        }
        this._number = number;
        events = new AutoResetEvent[number];
        statuses = new bool[number];
        _workComplete = false;
    }

    void Initialize() {
        _workComplete = false;
        for (int i = 0; i < _number; i++) {
            events[i] = new AutoResetEvent(false);
            statuses[i] = true;
        }
    }

    void DoWork(object state) {
        WorkerState ws = (WorkerState)state;
        statuses[ws.ID] = ws.Func(ws.Input);
        events[ws.ID].Set();
    }

    public void LaunchWorkers(Func<object, bool> func, object[] inputs) {
        Initialize();
        for (int i = 0; i < _number; i++) {
            WorkerState ws = new WorkerState(func, inputs[i], i);
            ThreadPool.QueueUserWorkItem(this.DoWork, ws);
        }
        WaitHandle.WaitAll(events);
        _workComplete = true;
    }

    void ThrowIfWorkIsNotDone() {
        if (!_workComplete) {
            throw new InvalidOperationException("work not complete");
        }
    }

    public bool GetWorkerStatus(int i) {
        ThrowIfWorkIsNotDone();
        return statuses[i];
    }

    public IEnumerable<int> SuccessfulWorkers {
        get {
            return WorkersWhere(b => b);
        }
    }

    public IEnumerable<int> FailedWorkers {
        get {
            return WorkersWhere(b => !b);
        }
    }

    IEnumerable<int> WorkersWhere(Predicate<bool> predicate) {
        ThrowIfWorkIsNotDone();
        for (int i = 0; i < _number; i++) {
            if (predicate(statuses[i])) {
                yield return i;
            }
        }
    }
}

Пример использования:

class Program {
    static Random rg = new Random();
    static object lockObject = new object();
    static void Main(string[] args) {
        int count = 64;
        Pair[] pairs = new Pair[count];
        for(int i = 0; i < count; i++) {
            pairs[i] = new Pair(i, 2 * i);
        }
        TrackedWorkers workers = new TrackedWorkers(count);
        workers.LaunchWorkers(SleepAndAdd, pairs.Cast<object>().ToArray());
        Console.WriteLine(
            "Number successful: {0}",
            workers.SuccessfulWorkers.Count()
        );
        Console.WriteLine(
            "Number failed: {0}",
            workers.FailedWorkers.Count()
        );
    }
    static bool SleepAndAdd(object o) {
        Pair pair = (Pair)o;
        int timeout;
        double d;
        lock (lockObject) {
            timeout = rg.Next(1000);
            d = rg.NextDouble();
        }
        Thread.Sleep(timeout);
        bool success = d < 0.5;
        if (success) {
            Console.WriteLine(pair.First + pair.Second);
        }
        return (success);

    }
}

Вышеуказанная программа запустит шестьдесят четыре потока. Задача i th состоит в добавлении чисел i и 2 * i и выводе результата на консоль. Тем не менее, я добавил случайное количество сна (менее одной секунды), чтобы имитировать занятость, и подбрасываю монету, чтобы определить успех или неудачу потока. Те, кто преуспевает, печатают сумму, с которой им поручено, и возвращают true Те, которые потерпели неудачу, ничего не печатают и возвращают false.

Здесь я использовал

struct Pair {
    public int First { get; private set; }
    public int Second { get; private set; }
    public Pair(int first, int second) : this() {
        this.First = first;
        this.Second = second;
    }
}
...