Как бы вы реализовали эту функциональность "WorkerChain" в .NET? - PullRequest
2 голосов
/ 24 апреля 2010

РЕДАКТИРОВАТЬ : Мне слишком поздно пришло в голову (?), Что весь код, который я разместил в своем первом обновлении этого вопроса, был слишком большим для большинства читателей. Я на самом деле пошел вперед и написал пост в блоге на эту тему для всех, кто хочет его прочитать.

А пока я оставил исходный вопрос на месте, чтобы кратко взглянуть на проблему, которую я хотел бы решить.

Я также просто отмечу, что код, который я разместил (в своем блоге), до сих пор довольно хорошо выдерживал тестирование. Но я все еще интересуюсь любыми отзывами, которые люди хотят дать мне о том, насколько он чист / надежен / эффективен.

* Мне нравится, как это слово на самом деле не означает, что мы думаем , но мы, разработчики, используем его все время.


Оригинальный вопрос

Извините за смутное название вопроса - не уверен, как кратко изложить то, что я спрашиваю ниже. (Если кто-то с правами на редактирование может придумать более описательный заголовок, смело меняйте его.)

Мне нужно такое поведение. Я предполагаю рабочий класс, который принимает одну делегатскую задачу в своем конструкторе (для простоты я бы сделал его неизменным - больше задач не может быть добавлено после создания экземпляра). Я назову эту задачу T. Класс должен иметь простой метод, например, GetToWork, который будет демонстрировать такое поведение:

  1. Если работник не в данный момент работает T, то он начнет делать это прямо сейчас.
  2. Если рабочий является , в настоящий момент работающим T, то после его завершения он сразу же начнет T.
  3. GetToWork можно вызывать любое количество раз, пока рабочий работает T; простое правило состоит в том, что во время любого выполнения T, если GetToWork был вызван хотя бы один раз, , T будет запущен снова после завершения (и затем, если GetToWork вызывается при T работает это время, оно повторится снова и т. д.).

Теперь, это довольно просто с булевым переключателем. Но этот класс должен быть поточно-ориентированным , и я имею в виду, что вышеприведенные шаги 1 и 2 должны включать атомарные операции (по крайней мере, я так думаю).

Добавлен слой сложности. Мне нужен класс "цепочки рабочих", который будет состоять из многих из этих рабочих, связанных вместе. Как только первый рабочий завершает работу, он по существу вызывает GetToWork для рабочего после it; Между тем, если вызывается его собственный GetToWork, он также перезапускается. Логический вызов GetToWork в цепочке по сути аналогичен вызову GetToWork в первом работнике в цепочке (я бы полностью хотел, чтобы работники цепочки не были публично доступны) .

Один из способов представить, как поведет себя эта гипотетическая «цепочка рабочих», - это сравнить ее с командой в эстафете. Предположим, есть четыре бегуна, от W1 до W4, и пусть цепочка будет называться C. Если я позвоню C.StartWork(), что должно произойти, это:

  1. Если W1 находится в своей начальной точке (т.е. ничего не делает), он начнет бежать к W2.
  2. Если W1 уже уже бежит к W2 (т.е. выполняет свою задачу), то, как только он достигнет W2, он подаст сигнал W2, чтобы начать, немедленно вернуться к его начальная точка и, поскольку StartWork был вызван, снова начинайте бежать к W2.
  3. Когда W1 достигнет начальной точки W2, он немедленно вернется к своей исходной точке.
    1. Если W2 просто сидит, он немедленно побежит к W3.
    2. Если W2 уже движется в направлении W3, то W2 просто пойдет снова, когда достигнет W3 и вернется к своей начальной точке.

Вышесказанное, вероятно, немного запутано и плохо написано.Но, надеюсь, вы получите основную идею.Очевидно, что эти рабочие будут работать в своих собственных потоках.

Кроме того, я полагаю, возможно, эта функциональность уже где-то существует?Если это так, определенно дайте мне знать!

Ответы [ 3 ]

1 голос
/ 24 апреля 2010

Наивная реализация, от которой вы можете получить некоторый пробег.

Примечание:

Насколько я понимаю, скалярные типы, то есть флаги bool, управляющие выполнением, имеют атомарное назначение, делающее их потокобезопасными, как вам и нужно в этом сценарии.

Существуют гораздо более сложные возможности, связанные с семафорами и другими стратегиями, но если все работает просто ...

using System;
using System.Threading;

namespace FlaggedWorkerChain
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            FlaggedChainedWorker innerWorker = new FlaggedChainedWorker("inner", () => Thread.Sleep(1000), null);
            FlaggedChainedWorker outerWorker = new FlaggedChainedWorker("outer", () => Thread.Sleep(500), innerWorker);

            Thread t = new Thread(outerWorker.GetToWork);
            t.Start();

            // flag outer to do work again
            outerWorker.GetToWork();

            Console.WriteLine("press the any key");
            Console.ReadKey();
        }
    }

    public sealed class FlaggedChainedWorker
    {
        private readonly string _id;
        private readonly FlaggedChainedWorker _innerWorker;
        private readonly Action _work;
        private bool _busy;
        private bool _flagged;

        public FlaggedChainedWorker(string id, Action work, FlaggedChainedWorker innerWorker)
        {
            _id = id;
            _work = work;
            _innerWorker = innerWorker;
        }

        public void GetToWork()
        {
            if (_busy)
            {
                _flagged = true;
                return;
            }

            do
            {
                _flagged = false;
                _busy = true;
                Console.WriteLine(String.Format("{0} begin", _id));

                _work.Invoke();

                if (_innerWorker != null)
                {
                    _innerWorker.GetToWork();
                }
                Console.WriteLine(String.Format("{0} end", _id));

                _busy = false;
            } while (_flagged);
        }
    }
}
1 голос
/ 26 апреля 2010

Мне кажется, что вы слишком усложняете это. Я написал эти "конвейерные" классы раньше; все, что вам нужно - это очередь рабочих, каждый из которых имеет дескриптор ожидания, который сигнализируется после завершения действия.

public class Pipeline : IDisposable
{
    private readonly IEnumerable<Stage> stages;

    public Pipeline(IEnumerable<Action> actions)
    {
        if (actions == null)
            throw new ArgumentNullException("actions");
        stages = actions.Select(a => new Stage(a)).ToList();
    }

    public Pipeline(params Action[] actions)
        : this(actions as IEnumerable<Action>)
    {
    }

    public void Dispose()
    {
        foreach (Stage stage in stages)
            stage.Dispose();
    }

    public void Start()
    {
        foreach (Stage currentStage in stages)
            currentStage.Execute();
    }

    class Stage : IDisposable
    {
        private readonly Action action;
        private readonly EventWaitHandle readyEvent;

        public Stage(Action action)
        {
            this.action = action;
            this.readyEvent = new AutoResetEvent(true);
        }

        public void Dispose()
        {
            readyEvent.Close();
        }

        public void Execute()
        {
            readyEvent.WaitOne();
            action();
            readyEvent.Set();
        }
    }
}

А вот тестовая программа, которую вы можете использовать для проверки того, что действия всегда выполняются в правильном порядке и одновременно может выполняться только одно из тех же действий:

class Program
{
    static void Main(string[] args)
    {
        Action firstAction = GetTestAction(1);
        Action secondAction = GetTestAction(2);
        Action thirdAction = GetTestAction(3);
        Pipeline pipeline = new Pipeline(firstAction, secondAction, thirdAction);
        for (int i = 0; i < 10; i++)
        {
            ThreadPool.QueueUserWorkItem(s => pipeline.Start());
        }
    }

    static Action GetTestAction(int index)
    {
        return () =>
        {
            Console.WriteLine("Action started: {0}", index);
            Thread.Sleep(100);
            Console.WriteLine("Action finished: {0}", index);
        };
    }
}

Короткий, простой, полностью поточно-ориентированный.

Если по какой-то причине вам нужно начать работать с определенного шага в цепочке, тогда вы можете просто добавить перегрузку для Start:

public void Start(int index)
{
    foreach (Stage currentStage in stages.Skip(index + 1))
        currentStage.Execute();
}

Редактировать

Основываясь на комментариях, я думаю, что несколько небольших изменений во внутреннем Stage классе должно быть достаточно, чтобы получить желаемое поведение. Нам просто нужно добавить событие «в очереди» в дополнение к событию «готово».

    class Stage : IDisposable
    {
        private readonly Action action;
        private readonly EventWaitHandle readyEvent;
        private readonly EventWaitHandle queuedEvent;

        public Stage(Action action)
        {
            this.action = action;
            this.readyEvent = new AutoResetEvent(true);
            this.queuedEvent = new AutoResetEvent(true);
        }

        public void Dispose()
        {
            readyEvent.Close();
        }

        private bool CanExecute()
        {
            if (readyEvent.WaitOne(0, true))
                return true;
            if (!queuedEvent.WaitOne(0, true))
                return false;
            readyEvent.WaitOne();
            queuedEvent.Set();
            return true;
        }

        public bool Execute()
        {
            if (!CanExecute())
                return false;
            action();
            readyEvent.Set();
            return true;
        }
    }

Также измените метод конвейера Start, чтобы прервать его, если этап не может быть выполнен (т.е. уже поставлен в очередь):

public void Start(int index)
{
    foreach (Stage currentStage in stages.Skip(index + 1))
        if (!currentStage.Execute())
            break;
}

Концепция здесь довольно проста, опять же:

  • Сцена сначала пытается немедленно получить состояние готовности. Если это удается, то он начинает работать.
  • Если ему не удается получить состояние готовности (т. Е. Задача уже выполняется), то он пытается получить состояние в очереди.
    • Если он получает состояние в очереди, он ждет, пока состояние готовности станет доступным, и затем освобождает состояние в очереди.
    • Если он также не может получить состояние в очереди, он сдается.

Я перечитал ваш вопрос и комментарии снова, и я почти уверен, что это именно то, что вы пытаетесь сделать, и дает лучший компромисс между безопасностью, пропускной способностью и регулированием.

Поскольку для ответа ThreadPool может потребоваться некоторое время, следует увеличить задержку в тестовой программе до 1000 вместо 100, если вы действительно хотите, чтобы происходили "пропуски".

1 голос
/ 24 апреля 2010

Использовать семафоры.Каждый работник является потоком со следующим кодом (псевдокод):

WHILE(TRUE)
    WAIT_FOR_SEMAPHORE(WORKER_ID) //The semaphore for the current worker
    RESET_SEMAPHORE(WORKER_ID)
    /* DO WORK */
    POST_SEMAPHORE(NEXT_WORKER_ID) //The semaphore for the next worker
END

Ненулевой семафор означает, что кто-то сигнализировал текущему потоку о выполнении работы.Получив ненулевой семафор в строке ввода, он сбрасывает семафор (помеченный как никто не сообщил), выполняет работу (тем временем семафор может быть снова опубликован) и публикует семафор для следующего работника.История повторяется в следующем (ых) работнике (ах).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...