Многопоточный загрузчик в вопросе C # - PullRequest
0 голосов
/ 06 мая 2010

В настоящее время у меня есть многопоточный класс загрузчика, который использует HttpWebRequest / Response. Все работает отлично, это супер быстро, НО .. проблема в том, что данные должны быть переданы во время их загрузки в другое приложение. Это означает, что он должен быть передан в правильном порядке, сначала первый чанк, а затем следующий в очереди. В настоящее время мой класс загрузчика является sync, а Download () возвращает byte []. В моем многопоточном асинхронном классе я создаю, например, список с 4 пустыми элементами (для слотов) и передаю каждый индекс слота каждому потоку с помощью функции Download (). Это имитирует синхронизацию, но это не то, что мне нужно. Как мне поступить с очередью, чтобы убедиться, что данные передаются в потоковом режиме, как только начинается загрузка первого чанка.

Ответы [ 3 ]

2 голосов
/ 06 мая 2010

Если ваш вопрос касается того, как определить, какой поток загружает первый блок и когда этот первый блок готов к использованию, используйте событие для каждого потока и отслеживайте, какие блоки вы назначили каким потокам. Отслеживайте, какое событие вы передаете первому потоку (который будет загружать первый блок данных), событие, которое вы передаете второму потоку (для второго блока данных) и т. Д. Иметь основной поток или другой фоновый поток (чтобы не блокировать поток пользовательского интерфейса), дождитесь первого события. Когда первый поток заканчивает загрузку своего чанка, он устанавливает / сигнализирует о первом событии. Поток, который ожидает, затем проснется и может использовать первый кусок данных.

Другие потоки загрузки могут делать то же самое, сигнализируя о соответствующих событиях по завершении. Используйте событие ручного сброса, чтобы событие оставалось сигнальным, даже если никто не ждет его. Когда поток, которому нужны блоки данных по порядку, заканчивает обработку первого блока данных, он может ожидать 2-го события. Если о 2-м событии уже было сообщено, то ожидание немедленно вернется, и поток может начать обработку 2-го блока данных.

Для очень большой загрузки вы можете повторно использовать события и потоки в циклическом порядке. Порядок, в котором они завершаются, не важен, если поток, который использует блоки данных, использует их по порядку и ожидает соответствующих событий в порядке.

Если вы сообразительны и осторожны, вы, вероятно, можете сделать все это, используя только одно событие: создать глобальный массив указателей / объектов чанков данных, изначально равных нулю, рабочие потоки загружают чанки данных и назначают готовые чанки их соответствующий слот в глобальном массиве, а затем сигнализировать общее событие. Поток потребителя хранит счетчик порций данных, чтобы он знал, какой блок данных ему необходимо обработать следующим, ожидает общее событие и, когда он получает сигнал, смотрит на следующий слот в глобальном массиве, чтобы увидеть, появились ли там данные. Если в следующем слоте по-прежнему нет данных, потребительский поток возвращается к ожиданию события. Вам также понадобится способ, чтобы рабочие потоки знали, какой блок данных они должны загрузить следующим - глобального счетчика, защищенного мьютексом или получающего доступ с использованием interlockedadd / exchange, будет достаточно. Каждый рабочий поток увеличивает глобальный счетчик и загружает этот номер блока данных и присваивает результат n-му слоту в глобальном списке блоков данных.

0 голосов
/ 06 мая 2010

Можете ли вы показать код, где вы выполняете загрузку, и код, с которого вы запускаете несколько асинхронных потоков?

Возможно, я не совсем понимаю ваш сценарий, но на вашем месте я бы использовал Async (BeginRead в responseStream). Тогда я бы сделал следующее ....

void StartReading(Stream responseStream)
{
    byte [] buffer = new byte[1024];
    Context ctx = new Context();
    ctx.Buffer = buffer;
    ctx.InputStream = responseStream;
    ctx.OutputStream = new MemoryStream(); // change this to be your output stream

    responseStream.BeginRead(buffer, 0, buffer.Length; new AsyncCallback(ReadCallback), ctx);
}

void ReadCallback(IAsyncResult ar)
{
    Context ctx = (Context)ar.AsyncState;
    int read = 0;
    try {
        read = ctx.InputStream.EndRead(ar);
        if (read > 0)
        {
            ctx.OutputStream.Write(ctx.Buffer, 0, read);
            // kick off another async read
            ctx.InputStream.BeginRead(ctx.Buffer, 0, ctx.Buffer.Length, new AsyncCallback(ReadCallback), ctx);
        } else {
            ctx.InputStream.Close();
            ctx.OutputStream.Close();
        }
     } catch {
     }
}

}
0 голосов
/ 06 мая 2010

Чтобы создать синхронизированный многопоточный загрузчик, вам нужно создать правильную структуру данных, и вам потребуется больше, чем просто byte[] данных.

Шаги:

  1. Разбейте свою загрузку на несколько частей в зависимости от размера контента или загрузчика контента фиксированного размера около 500 КБ, загруженных каждым потоком.
  2. При запуске потока укажите индекс чанка - 1-я часть, 2-я часть и т. Д.
  3. Когда загрузка доступна, выровняйте окончательный контент на основе индекса чанка.

Если вам интересно, вы можете взглянуть на код prozilla (C, на основе Linux - at) или Axel.

...