Thread.Sleep блокирует параллельное выполнение задач - PullRequest
4 голосов
/ 27 сентября 2011

Я вызываю рабочий метод, который вызывает базу данных, которая затем выполняет итерацию и возвращает значения для параллельной обработки. Чтобы он не забивал базу данных, у меня есть Thread.Sleep, чтобы приостановить выполнение в БД. Тем не менее, это, кажется, блокирует выполнение, которое все еще происходит в Parallel.ForEach. Каков наилучший способ предотвратить блокировку?

private void ProcessWorkItems()
{
    _cancellation = new CancellationTokenSource();
    _cancellation.Token.Register(() => WorkItemRepository.ResetAbandonedWorkItems());

    Task.Factory.StartNew(() =>
        Parallel.ForEach(GetWorkItems().AsParallel().WithDegreeOfParallelism(10), workItem =>
        {
            var x = ItemFactory(workItem);
            x.doWork();
        }), _cancellation.Token);
}

private IEnumerable<IAnalysisServiceWorkItem> GetWorkItems()
{
    while (!_cancellation.IsCancellationRequested)
    {
        var workItems = WorkItemRepository.GetItemList(); //database call

        workItems.ForEach(item =>
        {
            item.QueueWorkItem(WorkItemRepository);
        });

        foreach (var item in workItems)
        {
            yield return item;
        }

        if (workItems.Count == 0)
        {
            Thread.Sleep(30000); //sleep this thread for 30 seconds if no work items.
        }
    }

    yield break;
}

Edit: Я изменил его, чтобы включить ответ, и он все еще не работает, как я ожидаю. Я добавил .AsParallel (). WithDegreeOfParallelism (10) в вызов GetWorkItems (). Являются ли мои ожидания неверными, когда я думаю, что Parallel должен продолжать выполняться, даже если базовый поток спит?

Пример: У меня 15 предметов, он перебирает и берет 10 предметов и запускает их. Когда каждый из них завершает работу, он запрашивает другого у GetWorkItems, пока не попытается запросить 16-й элемент. В этот момент он должен прекратить попытки захватить больше предметов, но должен продолжить обработку предметов 11-15, пока они не будут завершены. Это как параллель должна работать? Потому что в настоящее время он этого не делает. Сейчас он выполняет 6, блокирует последующие 10, все еще работающие в Parallel.ForEach.

Ответы [ 4 ]

8 голосов
/ 27 сентября 2011

Я бы предложил создать BlockingCollection (очередь) рабочих элементов и таймер, который вызывает базу данных каждые 30 секунд для ее заполнения.Что-то вроде:

BlockingCollection<WorkItem> WorkItems = new BlockingCollection<WorkItem>();

И при инициализации:

System.Threading.Timer WorkItemTimer = new Timer((s) =>
    {
        var items = WorkItemRepository.GetItemList(); //database call
        foreach (var item in items)
        {
            WorkItems.Add(item);
        }
    }, null, 30000, 30000);

Это будет запрашивать базу данных для элементов каждые 30 секунд.

Для планирования рабочих элементов, которые будут обрабатыватьсяУ вас есть несколько разных решений.Самое близкое к тому, что у вас есть, это:

WorkItem item;

while (WorkItems.TryTake(out item, Timeout.Infinite, _cancellation))
{
    Task.Factory.StartNew((s) =>
        {
            var myItem = (WorkItem)s;
            // process here
        }, item);
}

Это устраняет блокировки в любом из потоков и позволяет TPL решить, как лучше распределить параллельные задачи.

РЕДАКТИРОВАТЬ:

На самом деле, ближе к тому, что у вас есть:

foreach (var item in WorkItems.GetConsumingEnumerable(_cancellation))
{
    // start task to process item
}

Вы можете использовать:

Parallel.Foreach(WorkItems.GetConsumingEnumerable(_cancellation).AsParallel ...

Я не знаю, будет ли это работать или какЧто ж.Может стоит попробовать.,.

END OF EDIT

В общем, я предлагаю вам рассматривать это как приложение производителя / потребителя, причем производитель является потоком, который периодически запрашивает у базы данных новые элементы.,Мой пример запрашивает базу данных один раз каждые N (в нашем случае 30) секунд, что будет хорошо работать, если в среднем вы будете очищать свою рабочую очередь каждые 30 секунд.Это даст среднюю задержку менее минуты с момента публикации элемента в базе данных до получения результатов.

Вы можете уменьшить частоту опроса (и, следовательно, задержку), но это приведет кбольше трафика базы данных.

С этим вы тоже можете полюбить.Например, если вы опрашиваете базу данных через 30 секунд и получаете огромное количество элементов, то, скорее всего, вы получите больше в ближайшее время, и вам захочется повторить опрос через 15 секунд (или меньше).И наоборот, если вы опрашиваете базу данных через 30 секунд и ничего не получите, то, вероятно, вы сможете подождать, прежде чем снова начать опрос.

Вы можете настроить этот тип адаптивного опроса, используя таймер с одним выстрелом.То есть вы указываете -1 для последнего параметра при создании таймера, который вызывает его срабатывание только один раз.Ваш обратный вызов таймера определяет, как долго ждать до следующего опроса, и вызывает Timer.Change, чтобы инициализировать таймер новым значением.

3 голосов
/ 27 сентября 2011

Вы можете использовать метод расширения .WithDegreeOfParallelism () , чтобы заставить PLinq запускать задачи одновременно.В разделе C # Threading Handbook

есть хороший пример в разделе *1003* Call Blocking или I / O Intensive .
2 голосов
/ 27 сентября 2011

Возможно, вы нарушаете правила Разделителя.

Поскольку вы передаете IEnumerable, Parallel.ForEach будет использовать Разделитель чанков, который может попытаться получить сразу несколько элементов из перечисления в чанке.,Но ваш IEnumerable.MoveNext может спать, что расстраивает вещи.

Вы можете написать свой собственный Partitioner, который возвращает один элемент за раз, но в любом случае, я думаю, что подход производителя / потребителя, такой как предложение Джима Мишелябудет работать лучше.

0 голосов
/ 27 сентября 2011

Чего вы пытаетесь достичь со сном? Из того, что я могу сказать, вы пытаетесь избежать колотящих вызовов базы данных. Я не знаю лучшего способа сделать это, но в идеале кажется, что ваш GetItemList вызов будет блокироваться, пока данные не будут доступны для обработки.

...