Фоновая обработка элементов в коллекции + добавление новых элементов (с использованием пула потоков) - PullRequest
0 голосов
/ 26 января 2012

В моем приложении есть объект, который выполняет обработку элементов коллекции в фоновом потоке. Когда объект создан, фоновая обработка всех существующих элементов в коллекции запускается с использованием пула потоков:

class CollectionProcessor
{
    public CollectionProcessor()
    {
        // Not actually called during the constructor just put it here to simplify the code sample
        Action process = new Action(this.Process);
        createIndex.BeginInvoke(true, ar => process.EndInvoke(ar), null);
    }

    void Process()
    {
        for (int i = 0; i < this.items.Count; i++)
        {
            this.ProcessItem(this.items[i]);
        }
    }
}

Существует некоторый дополнительный код для уведомлений об обратном вызове, но это в основном суть этого.

Новые элементы могут быть добавлены в эту коллекцию в любое время, и мне нужно убедиться, что эти новые элементы обработаны - уведомление о новых элементах предоставляется событием, которое запускается после того, как элементы уже добавлены в коллекцию. В обработчике событий для этого события мне нужно асинхронно возобновить обработку новых элементов в коллекции, а также:

  • Обеспечение того, чтобы я не обрабатывал один и тот же элемент дважды
  • Обеспечение обработки товаров в правильном порядке
  • Как избежать постановки в очередь большого количества заблокированных фоновых задач

Я также хочу добиться этого с помощью пула потоков вместо использования выделенного потока - Как мне это сделать? Очевидно, предполагается, что доступ к this.items является потокобезопасным.

1 Ответ

0 голосов
/ 26 января 2012

Мне кажется, я нашел достаточно аккуратный способ сделать это. Ключевым моментом является то, что если бы у меня был выделенный фоновый поток, выполняющий эту обработку, то решение довольно простое и может выглядеть примерно так:

AutoResetEvent ev = new AutoResetEvent(false);

// Called on a background thread
void ThreadProc()
{
    int lastProcessed = 0;
    while (true)
    {
        // Perform our processing as before
        for (int i = lastProcessed; i < this.items.Count; i++)
        {
            this.ProcessItem(this.items[i]);
        }

        // We have processed all items currently in the list, wait for some more
        ev.WaitOne();
    }
}

void OnNewItems()
{
    ev.Set();
}

Отсутствующей ссылкой является метод ThreadPool.RegisterWaitForSingleObject , который позволяет преобразовать его в пул потоков вместо выделенного потока:

int lastProcessed = 0;

void StartProcessing()
{
    ThreadPool.RegisterWaitForSingleObject(
        this.ev,
        new WaitOrTimerCallback(WaitProc),
        null,   // All state stored in the class instance itself
        -1,     // Always wait indefinitely for new items
        true    // Only execute once - each callback registers a new wait handle ensuring
                // that a maximum of 1 task is running Process at any one time
    );
}

void WaitProc(object state, bool timedOut)
{
    // Perform our processing as before
    for (int i = lastProcessed; i < this.items.Count; i++)
    {
        this.ProcessItem(this.items[i]);
    }

    // We have processed all items currently in the list, wait for some more
    this.StartProcessing();
}

Это устанавливает цикл так же, как и раньше, за исключением того, что мы не блокируем поток, ожидающий событие сброса.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...