Как добавить задачи очереди в пул потоков в C # - PullRequest
2 голосов
/ 20 марта 2011

Я все еще новичок в этой многопоточности.Допустим, у меня есть 50000 URL-адресов, и я хочу получать содержимое этих URL-адресов одновременно, например, обрабатывая каждые 10 URL-адресов вместе.затем, как только один из этих URL-адресов завершает обработку, программа должна добавить еще один из списка очередей, пока не завершит обработку всех URL-адресов в списке.теперь, как я могу сделать это с C # .. вот код, который я делаю до сих пор ..

 class RequestState
        {
            public WebRequest Request;

        // holds the request 
        public object Data;

        // store any data in this 
        public string SiteUrl;

        // holds the UrlString to match up results (Database lookup, etc). 

        public RequestState(WebRequest request, object data, string siteUrl)
        {
            this.Request = request;
            this.Data = data;
            this.SiteUrl = siteUrl;
        }
    }

    private void PROCESS_URLS_Click(object sender, EventArgs e)
    {
        //run the process
        process_URLs();
    }

private int ThreadsCount = 0;

  private void process_URLs()
    {
       //count threads number
        ThreadsCount = URLS_LISTVIEW.SelectedItems.Count;

       //loop through all URLs in listview
        for (int i = 0; i < URLS_LISTVIEW.SelectedItems.Count; i++)
        {
            try
            {
                //get url string
                string myURLs = URLS_LISTVIEW.SelectedItems[i].SubItems[0].Text.Trim();

                // for each URL in the collection...
                WebRequest request = HttpWebRequest.Create(myURLs);
                request.Method = "GET";
                object data = new object();

                RequestState state = new RequestState(request, data, myURLs);
                IAsyncResult result = request.BeginGetResponse(new AsyncCallback(UpdateItem), state);
                ThreadPool.RegisterWaitForSingleObject(result.AsyncWaitHandle, new WaitOrTimerCallback(ScanTimeoutCallback), state, (30 * 1000), true);

            }
            catch (ThreadStateException es)
            {
                MessageBox.Show(es.Message);
            }

        }



    }




 private void UpdateItem(IAsyncResult result)
    {
        RequestState state = (RequestState)result.AsyncState;
        WebRequest request = (WebRequest)state.Request;
        try
        {// grab the custom state object
            // get the Response
            HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result);

            // process the response...
            Stream s = (Stream)response.GetResponseStream();
            StreamReader readStream = new StreamReader(s);

            //data grabbed
            string dataString = readStream.ReadToEnd();
            response.Close();
            s.Close();
            readStream.Close();



        //finished grabbing content for this thread.
        ThreadsCount = ThreadsCount - 1;


        //if all threads finished running then execute final code to tell the user the process finished
        if (ThreadsCount < 1)
        {
            //show message
            MessageBox.Show("finished");
        }

       // Thread.Sleep(400);

    }





private static void ScanTimeoutCallback(object state, bool timedOut)
    {
        if (timedOut)
        {
            RequestState reqState = (RequestState)state;

            if (reqState != null)
                reqState.Request.Abort();


        }
    }

любые идеи будут оценены:)

С уважением,

Ответы [ 2 ]

6 голосов
/ 20 марта 2011

Посмотрите на TPL, есть возможность указать максимальный параллелизм:

List<string> UriList = new List<string>();
...
Parallel.ForEach(UriList, 
                 new ParallelOptions() {MaxDegreeOfParallelism=10}, 
                 (x) =>
{
    ProcessUrl(x);
});

Это будет обрабатывать не более 10 URL параллельно, поскольку мы используем перегрузку Parallel.Foreach(), которая позволяет нам указать MaxDegreeOfParallelism.

Изменить:

Вот простой пример, который загружает Html из http://google.com 50 раз параллельно (но не более 10 одновременно) и сохраняет результаты в массиве:

List<string> UriList = new List<string>();
for(int i =0;i<50;i++)
    UriList.Add("http://google.com");

string[] HtmlResults = new string[UriList.Count];

Parallel.ForEach(UriList, 
                 new ParallelOptions() { MaxDegreeOfParallelism = 10 }, 
                 (url, i, j) =>
{
    WebClient wc = new WebClient();
    HtmlResults[j] = wc.DownloadString(url);
});

Не для того, чтобы создавать больше путаницы, но в вашем конкретном случае PLINQ также будет работать очень хорошо, поскольку нет никаких зависимостей между элементом для обработки, и у вас есть фактический результат, что URL-адрес "преобразуется" в :

var htmlResultList = UriList.AsParallel()
                            .WithDegreeOfParallelism(10)
                            .AsOrdered()
                            .Select(url => { WebClient wc = new WebClient(); return wc.DownloadString(url); })
                            .ToList();
3 голосов
/ 20 марта 2011

(Это должен быть комментарий под @BrokenGlass, но я пока не могу оставлять комментарии)

Вы можете взглянуть на эту статью о том, как использовать параллельную обработку и PLINQ, чтобы делать то, что вы ищете. Весь набор статей, которые ему предшествуют, также содержат полезную информацию.

Редактировать: если это автономно, создайте новый поток, чтобы запустить эту часть в фоновом режиме, чтобы это не привело к не отвечающему пользовательскому интерфейсу.

Редактировать 2: При желании вы также можете бросить свои строки в ConcurrentQueue , чтобы вы могли добавлять элементы из пользовательского интерфейса при их поиске.

...