Кодирование / Разработка общего поточно-безопасного ограничителя (то есть ограничить выполнение X () до Y много раз в секунду) - PullRequest
4 голосов
/ 20 февраля 2010

Я планирую разработать класс, чтобы ограничить выполнение функции заданным количеством в течение указанного времени, например:

  • Процесс макс. 5 файлов за 1 секунду

Он должен быть поточно-ориентированным, а снижение производительности должно быть минимальным.

Как бы вы разработали такой класс? У меня есть пара идей, но ни одна из них мне не показалась правильной.

Существует ли какой-либо известный шаблон проектирования для такой задачи? ( Я пишу .NET, но любой язык в порядке )

Внешний класс должен работать следующим образом ( при условии, что он одиночный ):

Установка:

Limiter.Instance.MaxExecutionPerSecond = 5

Затем мы вызываем это в потоках перед выполнением нашей функции и, если это требуется, блокируют поток:

Limiter.Instance.WaitIfRequired()

Ответы [ 3 ]

2 голосов
/ 20 февраля 2010

Как то так?

using Timer = System.Threading.Timer;

class Limiter{
    public static readonly Limiter Instance = new Limiter();

    Limiter(){}

    int         max;
    Semaphore   counter;
    List<Timer> timers  = new List<Timer>();

    // warning: not thread safe!
    public int MaxExecutionPerSecond{
        get{return max;}
        set{counter = new Semaphore(max = value, value);}
    }

    public void WaitIfRequired(){
        // Our semaphore starts with a count of MaxExecutionPerSecond.
        // When we call WaitOne(), it decrements the count.  If the count
        // is already zero, the call to WaitOne() will block until another
        // thread calls Release() to increment the count.
        counter.WaitOne();

        // Set a timer to increment the semaphore in one second.
        Timer t = null;
        t = new Timer(o=>{
            // Increment the semaphore.
            counter.Release();

            // We no longer need to protect this timer from the GC.
            timers.Remove(t);
            t.Dispose();
        });

        // Hold a reference to this timer to keep it from being disposed of.
        timers.Add(t);

        // Set the timer to release the semaphore in one second.
        t.Change(1000, Timeout.Infinite);
    }
}

EDIT

Следует иметь в виду, что приведенный выше код будет препятствовать тому, чтобы многие потоки запускали одновременно. Если потоки долго работают, то все равно возможно иметь много потоков , работающих одновременно. Например, если вы запускаете 5 потоков в секунду, но каждый поток работает в течение 1 секунды, то в любой данный момент времени через 2 секунды теоретически у вас будет работать 10 потоков.

Если вы хотите убедиться, что у вас никогда не запущено более 5 потоков одновременно, проще всего обойтись без пользовательского класса Limiter и просто напрямую использовать Semaphore.

const int maxThreadCount = 5;
static Semaphore counter = new Semaphore(maxThreadCount, maxThreadCount);

static void NewThread(object state){
    counter.WaitOne();

    // do something

    counter.Release();
}

Теперь это так просто, как только может быть. Однако есть одна оговорка: создание новых потоков и немедленное их сохранение обычно считается плохой идеей. Это использует системные ресурсы для создания потока, который ничего не делает. Лучше ставить запросы в очередь и запускать новые потоки (или еще лучше использовать потоки пула потоков), чтобы обрабатывать их только тогда, когда они имеют право на запуск. Это сложнее и сложнее понять. В зависимости от дизайна может потребоваться дополнительный поток для планировщика / управления. Примерно так:

class Limiter{
    class WorkData{
        readonly ParameterizedThreadStart action;
        readonly object                   data;

        public ParameterizedThreadStart Action{get{return action;}}
        public object                   Data  {get{return data;}}

        public WorkData(ParameterizedThreadStart action, object data){
            this.action = action;
            this.data   = data;
        }
    }

    readonly Semaphore       threadCount;
    readonly Queue<WorkData> workQueue   = new Queue<WorkData>();
    readonly Semaphore       queueCount  = new Semaphore(0, int.MaxValue);

    public Limiter(int maxThreadCount){
        threadCount = new Semaphore(maxThreadCount, maxThreadCount);
        Thread t = new Thread(StartWorkItems);
        t.IsBackground = true;
        t.Start();
    }

    void StartWorkItems(object ignored){
        while(queueCount.WaitOne() && threadCount.WaitOne()){
            WorkData wd;
            lock(workQueue)
                wd = workQueue.Dequeue();

            ThreadPool.QueueUserWorkItem(DoWork, wd);
        }
    }
    void DoWork(object state){
        WorkData wd = (WorkData)state;
        wd.Action(wd.Data);
        counter.Release();
    }

    public void QueueWork(ParameterizedThreadStart action, object data){
        lock(workQueue)
            workQueue.Enqueue(new WorkData(action, data));
        queueCount.Release();
    }
}

В этом классе я удалил свойство singleton и дал конструктору параметр maxThreadCount. Это позволяет избежать отсутствия безопасности потока в свойстве первого класса. Есть и другие способы обеспечения безопасности потоков, но это было самое простое.

1 голос
/ 20 февраля 2010

Для того разрешения, которое вы ищете, DateTime.Now - отличные часы и очень дешевые. Он обновляется с точностью чуть более 15 мсек. Вот пример, вызовите метод Operation () непосредственно перед выполнением операции:

using System;

class Throttle {
  private int mTrigger;
  private int mOperations;
  private DateTime mStart;

  public Throttle(int maxOperationsPerSecond) {
    if (maxOperationsPerSecond < 1) throw new ArgumentException();
    mTrigger = maxOperationsPerSecond;
  }
  public void Operation() {
    mOperations += 1;
    if (mOperations > mTrigger) {
      TimeSpan span = DateTime.UtcNow - mStart;
      if (span.TotalMilliseconds < 1000)
        System.Threading.Thread.Sleep(1000 - (int)span.TotalMilliseconds);
      mOperations = 1;
    }
    if (mOperations == 1) mStart = DateTime.UtcNow;
  }
}

Создайте экземпляр класса в вашем потоке, не делитесь им.

0 голосов
/ 20 февраля 2010

Я предполагаю, что получение текущего времени обходится недорого по сравнению с тем, что будет делать функция.

В этом случае, я сделал что-то подобное в Scala, и шаблон дизайна будет выглядеть примерно так:

  • WaitIfRequired должен блокироваться, если уже выполняется. (Это будет "синхронизированный метод" в Java; я не уверен, что такое эквивалент .NET.)
  • Поддерживать очередь, в которую вызывался WaitIfRequired.
  • Если длина очереди превышает MaxExecutionPerSecond, сокращайте ее до тех пор, пока она не станет длиннее MaxExecutionPerSecond.
  • Снимите верхнюю часть очереди, если она длинна MaxExecutionPerSecond. Если с того времени время больше секунды, вставьте текущее время в конец очереди и немедленно вернитесь; достаточно времени прошло. Если оно меньше секунды, перед сном и возвратом спите столько времени, сколько нужно для того, чтобы она была секунда (т. Е. Прошло 1 секунда).

Вот и все.

Вы можете поиграть с этим, запрашивая не более N вызовов за время T, заменив «одну секунду» на T.

Теперь, если у вас нет синхронизированных методов, все становится немного интереснее (но только немного). Затем вам потребуется внешний механизм блокировки, чтобы убедиться, что только один поток за раз читает и ожидает.

...