MultiThreading: ограничить количество одновременных потоков - PullRequest
4 голосов
/ 16 мая 2011

Мне нужно разработать приложение, использующее многопоточность.

По сути, у меня есть DataTable, который содержит около 200 тыс. Строк. Из каждой строки мне нужно взять поле, сравнить его с веб-страницей, а затем удалите его из таблицы данных.

Дело в том, что сервер, обслуживающий эти страницы, имеет ограничение на количество одновременных запросов. так что в максимуме я могу попросить 3 страницы одновременно.

Я хочу сделать это с помощью пула потоков, Мне даже удалось создать простое приложение, которое делает это (блокирует данные) но я не мог ограничить параллельные потоки (даже с SetMaxThreads), похоже, он просто проигнорировал ограничение.

у кого-нибудь есть что-то готовое, что-то подобное? Я хотел бы видеть.

Я пытался использовать семафоры, но столкнулся с проблемами:

        static SemaphoreSlim _sem = new SemaphoreSlim(3);    // Capacity of 3
    static List<string> records = new List<string>();

    static void Main()
    {
        records.Add("aaa");
        records.Add("bbb");
        records.Add("ccc");
        records.Add("ddd");
        records.Add("eee");
        records.Add("fff");
        records.Add("ggg");
        records.Add("iii");
        records.Add("jjj");

        for (int i = 0; i < records.Count; i++ )
        {
            new Thread(ThreadJob).Start(records[i]);
        }

        Console.WriteLine(records.Count);
        Console.ReadLine();
    }

    static void ThreadJob(object id)
    {
        Console.WriteLine(id + " wants to enter");
        _sem.Wait();
        Console.WriteLine(id + " is in!");           // Only three threads
        //Thread.Sleep(1000 * (int)id);               // can be here at
        Console.WriteLine(id + " is leaving");       // a time.

        lock (records)
        {
            records.Remove((string)id);
        }

        _sem.Release();
    }

это работает довольно хорошо, единственная проблема,

Console.WriteLine(records.count);

возвращает разные результаты. даже из-за того, что я понимаю, что это происходит, так как не все потоки закончили (я вызываю records.count до того, как все записи были удалены), я не мог найти, как ждать, пока все не закончится.

Ответы [ 4 ]

2 голосов
/ 16 мая 2011

Чтобы дождаться завершения нескольких потоков, вы можете использовать несколько EventWaitHandle и затем вызвать WaitHandle.WaitAll, чтобы заблокировать основной поток, пока не будут сигнализированы все события:

// we need to keep a list of synchronization events
var finishEvents = new List<EventWaitHandle>();

for (int i = 0; i < records.Count; i++ )
{
    // for each job, create an event and add it to the list
    var signal = new EventWaitHandle(false, EventResetMode.ManualReset);
    finishEvents.Add(signal);

    // we need to catch the id in a separate variable
    // for the closure to work as expected
    var id = records[i];

    var thread = new Thread(() =>
        {
            // do the job
            ThreadJob(id);

            // signal the main thread
            signal.Set();
        });
}

WaitHandle.WaitAll(finishEvents.ToArray());

Поскольку большинство из этих потоков в большинстве случаев зависают, было бы лучше использовать ThreadPool в этом случае, поэтому вы можете заменить new Thread на:

    ThreadPool.QueueUserWorkItem(s =>
    {
        ThreadJob(id);
        signal.Set();
    });

Когда вы закончите с событиями, не забудьте утилизировать их:

foreach (var evt in finishEvents)
{
    evt.Dispose();
}

[Изменить]

Чтобы поместить все это в одно место, вот как должен выглядеть ваш пример кода:

static Semaphore _sem = new Semaphore(3, 3);    // Capacity of 3
static List<string> _records = new List<string>(new string[] { "aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg", "hhh" });

static void Main()
{
    var finishEvents = new List<EventWaitHandle>();

    for (int i = 0; i < _records.Count; i++)
    {
        var signal = new EventWaitHandle(false, EventResetMode.ManualReset);
        finishEvents.Add(signal);

        var id = _records[i];
        var t = new Thread(() =>
        {
            ThreadJob(id);
            signal.Set();
        });

        t.Start();
    }

    WaitHandle.WaitAll(finishEvents.ToArray());

    Console.WriteLine(_records.Count);
    Console.ReadLine();
}

static void ThreadJob(object id)
{
    Console.WriteLine(id + " wants to enter");
    _sem.WaitOne();

    Console.WriteLine(id + " is in!");
    Thread.Sleep(1000);
    Console.WriteLine(id + " is leaving");

    lock (_records)
    {
        _records.Remove((string)id);
    }

    _sem.Release();
}

(обратите внимание, что я использовал Semaphore вместо SemaphoreSlim, потому что у меня нет .NET 4 на этом компьютере, и я хотел проверить код перед обновлением ответа)

1 голос
/ 16 мая 2011

Почему бы не использовать Параллельные Расширения - это бы упростило задачу.

В любом случае, вы, вероятно, захотите взглянуть на что-то вроде семафоров. Я написал сообщение в блоге на эту тему месяц или два назад, которое может оказаться полезным: https://colinmackay.scot/2011/03/30/using-semaphores-to-restrict-access-to-resources/

0 голосов
/ 16 мая 2011

Во-первых, должен Console.WriteLine (id + "уходит");не позже, после блокировки и непосредственно перед выпуском семафора?

Что касается фактического ожидания завершения всех потоков, ответ Груо выглядит лучше и надежнее в долгосрочной перспективе, но какболее быстрое / более простое решение для этого конкретного фрагмента кода, я думаю, что вы могли бы также избежать простого вызова .Join () для всех потоков, которые вы хотите ждать, последовательно.

static List<Thread> ThreadList = new List<Thread>(); // To keep track of them

затемпри запуске потоков замените текущую новую строку Thread на:

ThreadList.Add(new Thread(ThreadJob).Start(records[i]));

, а затем непосредственно перед Console.WriteLine:

foreach( Thread t in ThreadList )
{
    t.Join();
}

Это заблокируется, если какой-либо из потоков не будетне завершайте работу, и если вы когда-нибудь захотите узнать, какие потоки еще не завершены, этот метод не будет работать.

0 голосов
/ 16 мая 2011

вы можете использовать Семафор , если вы находитесь под .net 3.5

или

SemaphoreSlim in. net 4.0

...