ConcurrentBag Пропуск некоторых предметов C# - PullRequest
0 голосов
/ 26 февраля 2020

Я использую concurrentbag для очистки URL-адресов, сейчас он работает нормально для 500/100 URL, но когда я пытаюсь очистить 8000 URL-адресов. Все URL не обрабатываются и некоторые элементы ожидают в inputQueue.

Но я использую while (! InputQueue.IsEmpty). Таким образом, он должен запускать l oop до тех пор, пока какие-либо элементы не появятся во входной очереди.

Я хочу запустить максимум 100 потоков. Итак, я сначала создал 100 потоков и вызвал метод «Run ()», и внутри этого метода я запускаю al oop, чтобы принимать элементы до тех пор, пока элементы не выходят из очереди ввода, и добавлять в очередь вывода после очистки URL-адресов.

public ConcurrentBag<Data> inputQueue = new ConcurrentBag<Data>();
    public ConcurrentBag<Data> outPutQueue = new ConcurrentBag<Data>();

    public List<Data> Scrapes(List<Data> scrapeRequests)
    {
        ServicePointManager.ServerCertificateValidationCallback += (sender, cert, chain, sslPolicyErrors) => true;
        string proxy_session_id = new Random().Next().ToString();

        numberOfRequestSent = 0;

        watch.Start();

        foreach (var sRequest in scrapeRequests)
        {
            inputQueue.Add(sRequest);
        }
        //inputQueue.CompleteAdding();

        var taskList = new List<Task>();
        for (var i = 0; i < n_parallel_exit_nodes; i++) //create 100 threads only
        {
            taskList.Add(Task.Factory.StartNew(async () =>
            {
               await Run();
            }, TaskCreationOptions.RunContinuationsAsynchronously));
        }

        Task.WaitAll(taskList.ToArray());   //Waiting

        //print result
        Console.WriteLine("Number Of URLs Found - {0}", scrapeRequests.Count);
        Console.WriteLine("Number Of Request Sent - {0}", numberOfRequestSent);

        Console.WriteLine("Input Queue - {0}", inputQueue.Count);

        Console.WriteLine("OutPut Queue - {0}", outPutQueue.ToList().Count);
        Console.WriteLine("Success - {0}", outPutQueue.ToList().Where(x=>x.IsProxySuccess==true).Count().ToString());
        Console.WriteLine("Failed - {0}", outPutQueue.ToList().Where(x => x.IsProxySuccess == false).Count().ToString());
        Console.WriteLine("Process Time In - {0}", watch.Elapsed);

        return outPutQueue.ToList();
    }


    async Task<string> Run()
    {
        while (!inputQueue.IsEmpty)
        {
            var client = new Client(super_proxy_ip, "US");

            if (!client.have_good_super_proxy())
                client.switch_session_id();
            if (client.n_req_for_exit_node == switch_ip_every_n_req)
                client.switch_session_id();

            var scrapeRequest = new ProductResearch_ProData();
            inputQueue.TryTake(out scrapeRequest);

            try
            {
                numberOfRequestSent++;

                // Console.WriteLine("Sending request for - {0}", scrapeRequest.URL);
                scrapeRequest.HTML = client.DownloadString((string)scrapeRequest.URL);
                //Console.WriteLine("Response done for - {0}", scrapeRequest.URL);

                scrapeRequest.IsProxySuccess = true;

                outPutQueue.Add(scrapeRequest); //add object to output queue

                //lumanti code
                client.handle_response();
            }
            catch (WebException e)
            {
                Console.WriteLine("Failed");

                scrapeRequest.IsProxySuccess = false;
                Console.WriteLine(e.Message);
                outPutQueue.Add(scrapeRequest); //add object to output queue

                //lumanti code
                client.handle_response(e);
            }

            client.clean_connection_pool();
            client.Dispose();
        }

        return await Task.Run(() => "Done");
    }

1 Ответ

0 голосов
/ 26 февраля 2020

Здесь есть несколько проблем, но, похоже, ни одна из них не является причиной того, что inputQueue.Count имеет ненулевое значение в конце. В любом случае я хотел бы указать на проблемы, которые я вижу.

var taskList = new List<Task>();
for (var i = 0; i < n_parallel_exit_nodes; i++) // create 100 threads only
{
    taskList.Add(Task.Factory.StartNew(async () =>
    {
        await Run();
    }, TaskCreationOptions.RunContinuationsAsynchronously));
}

Метод Task.Factory.StartNew не понимает asyn c делегатов, поэтому, когда он вызывается с asyn c lambda в качестве аргумента возвращает вложенную задачу. В этом случае он возвращает Task<Task<string>>. Вы сохраняете эту вложенную задачу в коллекции List<Task>, что возможно потому, что тип Task<TResult> наследуется от типа Task, но при этом вы теряете возможность ждать завершения (и получаете результат) внутренней задачи. Вы держите только ссылку на внешнюю задачу. Чудесным образом это не является проблемой в этом случае (обычно это так), поскольку внешняя задача выполняет всю работу, а внутренняя задача практически ничего не делает (кроме использования потока пула потоков для возврата строки "Done", которая на самом деле не является где угодно).

Вы также не прикрепляете никаких продолжения к внешним задачам, поэтому флаг TaskCreationOptions.RunContinuationsAsynchronously кажется избыточным.

// create 100 threads only

Вы не создаете 100 потоков, вы создать 100 задач. Эти задачи запланированы в ThreadPool, который будет немедленно истощен, потому что задачи долго выполняются, и начнет вводить один новый поток каждые 500 мсек c, пока все запланированные задачи не будут назначены на поток.

var scrapeRequest = new ProductResearch_ProData();
inputQueue.TryTake(out scrapeRequest);

Здесь вы создаете экземпляр объекта типа ProductResearch_ProData, который немедленно отбрасывается и становится пригодным для сбора мусора в следующей строке. Метод TryTake вернет объект, извлеченный из сумки, или null, если сумка пуста. Вы игнорируете возвращаемое значение метода TryTake, которое вполне может быть false, поскольку при этом мешок мог быть опорожнен другим работником, а затем переходите к scrapeRequest, который может иметь нулевое значение, что приводит к в этом случае NullReferenceException.

Стоит отметить, что вы извлекаете объект типа ProductResearch_ProData из ConcurrentBag<Data>, так что либо класс Data наследуется от базового класса ProductResearch_ProData, либо это ошибка транскрипции в коде.

...