Поток для распараллеливания синхронного ввода-вывода в .Net - PullRequest
0 голосов
/ 21 января 2010

В настоящее время я нахожусь в процессе разработки системы обмена сообщениями для моего приложения (которое использует AMQP на сервере через RabbitMQ). Будет несколько случаев, когда метод может получать данные из нескольких источников одновременно (т. Е. Необязательно выполнять последовательные запросы).

Первоначально я собирался использовать ThreadPool и QueueUserWorkItem для каждого отдельного запроса в методе, а затем каким-то образом объединять их. Это может быть проблематично, потому что несколько различных компонентов приложения могут сделать это одновременно, и каждый компонент может иметь большое количество параллельных запросов, которые могут привести к истощению ThreadPool.

Есть ли более эффективный / действенный способ сделать это?

1 Ответ

2 голосов
/ 21 января 2010

Это нормально, чтобы подчеркнуть пул потоков. Вы можете бросить на него кучу рабочих элементов - сотни, тысячи - и просто позволить ему порвать. Не уверен, что вы подразумеваете под "голодать". Если нет набора рабочих элементов, которые должны быть расставлены по-другому, вам, вероятно, не нужно беспокоиться о голоде.

Если вы используете QUWI, вам нужно выяснить, как объединить распараллеленные результаты обратно в один.


Звучит так, будто вы делаете карту / уменьшите подход. Вот функция быстрой карты, которая использует QUWI, и пример того, как ее использовать.

public static IEnumerable<T2> Map_QUWI<T, T2>(List<T> inputs, Func<T, T2> fn)
{
    int c = inputs.Count;
    if (c == 0) return null;
    T2[] result = new T2[c];
    if (c == 1)
    {
        // only one input - perform the work on main thread
        result[0] = fn(inputs[0]);
        return result;
    }

    using (ManualResetEvent done = new ManualResetEvent(false))
    {
        int countdown = inputs.Count;
        WaitCallback cb = delegate (Object obj)
            {
                int ix = (int)obj;
                result[ix] = fn(inputs[ix]);
                if (Interlocked.Decrement(ref countdown) == 0)
                    done.Set(); // signal all done
            };

        // queue up all workitems
        for (int i = 0; i < c; i++)
            ThreadPool.QueueUserWorkItem(cb,i);

        // Wait for done.Set(), which happens in the WaitCallback
        // when the last workitem is completed.
        done.WaitOne();
    }

    return result;
}

Пример использования:

// returns the number of prime numbers, less than or equal to x
private int NumberOfPrimesLessThanOrEqualTo(int x)
{
    int count= 0;
    int n = x;
    if (n>=2) count++;
    if (x%2==0) n--;
    if (n>0)
    {
        do
        {
            if (IsPrime(n)) count++;
            n-=2;
        } while (n>0);
    }
    return count;
}


private void Demo()
{
    var list = new List<int>(new int[] {2,4,8,16,32,64,128,256,512,1024,2048,
                                        4096,8192,16384,32768,65536,131072});
    Func<int,int> fn = NumberOfPrimesLessThanOrEqualTo;
    var result= Map_QUWI(list, fn);
    (new List<int>(result)).ForEach(System.Console.WriteLine);
}
...