Извлечение огромных наборов данных через узкие места вывода / конечной точки, используя многопоточность разделяй и властвуй. C# - PullRequest
1 голос
/ 08 апреля 2020

Я попал в ситуацию, когда мне нужно исследовать относительно огромный набор данных через заблокированный вывод, который представляет собой одиночную запись на основе идентификатора, возвращающую конечную точку. Повторение этой конечной точки для последовательной очистки всех данных, очевидно, не является практическим вариантом, поэтому я попытался создать распараллеленные множественные процессы, каждый из которых имеет свой собственный диапазон идентификаторов для работы. Вот что мне удалось сделать, что до сих пор не работает идеально. «См. Вопросы внизу»

Предисловие к коду ↴

В этой демонстрации я использовал http://numbersapi.com/. Просто вы вводите число, и API возвращает случайный факт о нем из фактов, связанных с ним. Если есть какие-либо другие значения, он возвращает рандомизированную одну из 4 фраз, содержащих число без фактов.
Примеры API:
Число, которое имеет хотя бы один факт → http://numbersapi.com/1
Число, с которым не связаны факты → http://numbersapi.com/123456789

Приложение

ThreadRange : элементы, обрабатываемые одним потоком

Код ↴

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Security.Authentication;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;

namespace CSharp
{
    class Program
    {
        private static Stopwatch CrawlerStopWatch = new Stopwatch();
        private static HttpClient HttpClientCrawler = null;
        private static HttpClientHandler HttpClientCrawlerHttpsHandler = new HttpClientHandler() { SslProtocols = SslProtocols.Tls12 | SslProtocols.Tls11 | SslProtocols.Tls };
        private static BlockingCollection<Thread> CrawlerThreads = new BlockingCollection<Thread>();
        private static BlockingCollection<String> CrawlerFailuresBlockingCollection = new BlockingCollection<String>();
        private static BlockingCollection<String> CrawlerHitsBlockingCollection = new BlockingCollection<String>();
        private static BlockingCollection<String> CrawlerMissesBlockingCollection = new BlockingCollection<String>();
        private static String BaseUrl = "http://numbersapi.com/";
        private static String FullUrl = String.Concat(BaseUrl, "{0}");
        private static long ThreadsCount = 1;
        private static long ThreadRange = 10;
        private static long Offset = 2;
        private static long ItemsToProcess = ThreadsCount * ThreadRange;
        private static int MaxUrlLength = String.Format(FullUrl, ItemsToProcess).Length;
        private static ReaderWriterLockSlim CrawlerReaderWriterLockSlim = new ReaderWriterLockSlim();
        private static String CrawlerResultantFileName = "z.CrawlerResult.txt";
        public static void Main(String[] args)
        {
            CrawlerStopWatch.Start();
            //### Managing HttpClient ###/
            ServicePointManager.DefaultConnectionLimit = 50;
            if (Regex.IsMatch(BaseUrl, @"^https.*$")) { HttpClientCrawler = new HttpClient(HttpClientCrawlerHttpsHandler); } else { HttpClientCrawler = new HttpClient(); }
            HttpClientCrawler.BaseAddress = new Uri(BaseUrl);
            HttpClientCrawler.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/html"));
            //### Managing HttpClient ###/
            InitiateCrawler(BaseUrl, ThreadsCount, ThreadRange, Offset);
        }
        //### Crawler Methods ###
        public static void InitiateCrawler(String BaseUrl, long ThreadsCount, long ThreadRange, long Offset = 0)
        {
            Console.WriteLine("###############################");
            Console.WriteLine($"Commenced crawling the <{BaseUrl}> endpoint and working on <{ItemsToProcess}> items by creating <{ThreadsCount}> threads which each working on <{ThreadRange}> items.");
            while (CrawlerThreads.Count < ThreadsCount)
            {
                long Start = Offset + ThreadRange * CrawlerThreads.Count + 1;
                long End = Offset + ThreadRange * (CrawlerThreads.Count + 1);
                Thread CrawlerThread = new Thread(() => Crawl(Start, End));
                CrawlerThreads.Add(CrawlerThread);
                CrawlerThread.Start();
            }
            Task FinalizeCrawlerTask = Task.Run(() => { foreach (Thread CrawlerThread in CrawlerThreads) { CrawlerThread.Join(); } FinalizeCrawler(); });
            FinalizeCrawlerTask.Wait();
        }
        public static void Crawl(long Start, long End)
        {
            long Current = Start;
            while (Current <= End)
            {
                String CurrentUrlParamters = String.Format("{0}", Current);
                String CurrentUrl = $"{HttpClientCrawler.BaseAddress.AbsoluteUri}{CurrentUrlParamters}";
                String CurrentPageContent = "";
                HttpResponseMessage HttpResponseMessage = HttpClientCrawler.GetAsync(CurrentUrlParamters).Result;
                if (HttpResponseMessage.IsSuccessStatusCode)
                {
                    CurrentPageContent = Encoding.UTF8.GetString(HttpResponseMessage.Content.ReadAsByteArrayAsync().Result);
                    if (isResultRelevant(CurrentPageContent)) { HandleCrawlerRelevantResult(CurrentUrl, CurrentPageContent); } else { HandleCrawlerIrrelevantResult(CurrentUrl, CurrentPageContent); }
                }
                else
                {
                    HandleCrawlerFailure(CurrentUrl, HttpResponseMessage);
                }
                Current++;
            }
        }
        public static void HandleCrawlerFailure(String Url, HttpResponseMessage HttpResponseMessage)
        {
            CrawlerFailuresBlockingCollection.Add(Url);
            int ProcessedItems = CrawlerHitsBlockingCollection.Count + CrawlerMissesBlockingCollection.Count + CrawlerFailuresBlockingCollection.Count;
            Console.WriteLine($"[Item #{ProcessedItems.ToString().PadRight(ItemsToProcess.ToString().Length)}] {Url.PadRight(MaxUrlLength)} returned {(int)HttpResponseMessage.StatusCode} Code | {HttpResponseMessage.ReasonPhrase}");
        }
        public static Boolean isResultRelevant(String Content)
        {
            Boolean IsRelevant = true;
            String[] RegularExpressionsArray = new string[]
            {
                @"^[\d]+ is a boring number\.$",
                @"^[\d]+ is an uninteresting number\.$",
                @"^[\d]+ is an unremarkable number\.$",
                @"^[\d]+ is a number for which we're missing a fact (submit one to numbersapi at google mail!)\.$",
            };
            foreach (String RegularExpression in RegularExpressionsArray) { if (Regex.IsMatch(Content, RegularExpression)) { IsRelevant = false; break; } }
            return IsRelevant;
        }
        public static void HandleCrawlerRelevantResult(String Url, String Content)
        {
            CrawlerResultantFileWriteLine(Url);
            CrawlerHitsBlockingCollection.Add(Url);
            int ProcessedItems = CrawlerHitsBlockingCollection.Count + CrawlerMissesBlockingCollection.Count + CrawlerFailuresBlockingCollection.Count;
            Console.WriteLine($"[Item #{ProcessedItems.ToString().PadRight(ItemsToProcess.ToString().Length)}] {Url.PadRight(MaxUrlLength)} is relevant");
        }
        public static void HandleCrawlerIrrelevantResult(String Url, String Content)
        {
            CrawlerMissesBlockingCollection.Add(Url);
            int ProcessedItems = CrawlerHitsBlockingCollection.Count + CrawlerMissesBlockingCollection.Count + CrawlerFailuresBlockingCollection.Count;
            Console.WriteLine($"[Item #{ProcessedItems.ToString().PadRight(ItemsToProcess.ToString().Length)}] {Url.PadRight(MaxUrlLength)} is irrelevant");
        }
        public static void FinalizeCrawler()
        {
            CrawlerStopWatch.Stop();
            TimeSpan TimeSpan = TimeSpan.FromMilliseconds(CrawlerStopWatch.ElapsedMilliseconds);
            String TimeLapseInformation = String.Format("{0:D2}h:{1:D2}m:{2:D2}s:{3:D3}ms",
                                    TimeSpan.Hours,
                                    TimeSpan.Minutes,
                                    TimeSpan.Seconds,
                                    TimeSpan.Milliseconds);
            Console.WriteLine($"Crawling finished in {TimeLapseInformation}.");
            Console.WriteLine($"<{CrawlerFailuresBlockingCollection.Count + CrawlerHitsBlockingCollection.Count + CrawlerMissesBlockingCollection.Count}> out of <{ItemsToProcess}> items have been crawled having <{CrawlerHitsBlockingCollection.Count}> relevant items, <{CrawlerMissesBlockingCollection.Count}> irrelevant items and <{CrawlerFailuresBlockingCollection.Count}> failures.");
            Console.WriteLine("###############################");
        }
        //### Crawler Methods ###
        //### Auxiliary Methods ###
        public static void CrawlerResultantFileWriteLine(String Line)
        {
            CrawlerReaderWriterLockSlim.EnterWriteLock();
            try { using (StreamWriter StreamWriter = File.AppendText(String.Concat(AppDomain.CurrentDomain.BaseDirectory, "\\", CrawlerResultantFileName))) { StreamWriter.WriteLine(Line); StreamWriter.Close(); } }
            finally { CrawlerReaderWriterLockSlim.ExitWriteLock(); }
        }
        //### Auxiliary Methods ###
    }
}

Вопросы

(Q1.) Является ли представленный подход оптимальным?


(Q2.) В функции FinalizeCrawler (), если количество коллективных элементов для обработки во всех потоках слишком велико, количество обработанных элементов не равно назначенному количеству элементов для обработки в InitializeCrawler Вызов (UrlHandle, ThreadsCount, ThreadRange) в main как:
( CrawlerFailuresBlockingCollection.Count + CrawlerHits.Count + CrawlerMisses.Count ) должен быть равен ( ThreadsCount * ThreadRange )
Некоторые элементы пропущены? если да, то почему?

(A2.) Как указал Джошуа Робинсон , я использовал System.Collections.Generi c .Список небезопасной коллекции для изменения из нескольких потоков. Я переключился на System.Collections.Concurrent.BlockingCollection, протестировал его с большим числом, таким как 10000, и он заработал.

(Q3.) Как точно определить оптимальный ThreadRange и ThreadsCount комбинация в отношении производительности. Какой из них?
- (A) Минимальное количество потоков с максимальным диапазоном потоков
- (B) Низкое количество потоков с высоким диапазоном потоков
- (C) Средний диапазон потоков с количеством медианных потоков
- (B) High ThreadsCount с низким ThreadRange
- (E) Макс. ThreadsCount с min. ThreadRange
Если быть более сложным, если я хочу обработать 10000 элементов с этой конечной точки, это все возможные комбинации, удовлетворяющие необходимому предикату ThreadsCount * ThreadRange = 10000 , что я выбрал только из них.
Что является наиболее оптимальным? И почему?
<1>     ThreadsCount * <10000> ThreadRange
<2>     ThreadsCount * <5000>  ThreadRange
<4>     ThreadsCount * <2500>  ThreadRange
<5>     ThreadsCount * <2000>  ThreadRange
<8>     ThreadsCount * <1250>  ThreadRange
<10>    ThreadsCount * <1000>  ThreadRange
<16>    ThreadsCount * <625>   ThreadRange
<20>    ThreadsCount * <500>   ThreadRange
<25>    ThreadsCount * <400>   ThreadRange
<40>    ThreadsCount * <250>   ThreadRange
<50>    ThreadsCount * <200>   ThreadRange
<80>    ThreadsCount * <125>   ThreadRange
<100>   ThreadsCount * <100>   ThreadRange // Exact Median → square root of items to process if integer
<125>   ThreadsCount * <80>    ThreadRange
<200>   ThreadsCount * <50>    ThreadRange
<250>   ThreadsCount * <40>    ThreadRange
<400>   ThreadsCount * <25>    ThreadRange
<500>   ThreadsCount * <20>    ThreadRange
<625>   ThreadsCount * <16>    ThreadRange
<1000>  ThreadsCount * <10>    ThreadRange
<1250>  ThreadsCount * <8>     ThreadRange
<2000>  ThreadsCount * <5>     ThreadRange
<2500>  ThreadsCount * <4>     ThreadRange
<5000>  ThreadsCount * <2>     ThreadRange
<10000> ThreadsCount * <1>     ThreadRange

Ответы [ 2 ]

1 голос
/ 10 апреля 2020

(Q1.) Является ли представленный подход оптимальным?

У меня есть предложение, как улучшить текущее решение. В настоящее время все сканирующие потоки обрабатывают свои http-ответы самостоятельно. Более того, когда они обрабатывают свои http-ответы, они пытаются получить эксклюзивную блокировку (ReaderWriterLockSlim.EnterWriteLock) для записи результатов в файл.

Я предлагаю сделать так, чтобы сканирующие потоки отправляли только http-запросы и сразу добавляли http-ответы в какую-то коллекцию. , А другой поток (назовем его потоком обработки) будет обрабатывать эти http-ответы (извлекать данные, анализировать их, записывать в файл и т. Д. c). Такой подход даст нам следующие преимущества:

  1. Обход потоков не будет тратить свое время на обработку ответов http. Они будут выполнять только самые дорогие операции: отправлять http-запросы и получать http-ответы.
  2. Для обработки http-ответов не потребуется эксклюзивная блокировка. Поток обработки сможет обрабатывать ответы http без установки каких-либо блокировок.

Если в вашей реальной программе обработка ответов http занимает больше времени, чем в предоставленном примере, то эти преимущества уменьшат общее выполнение время выполнения программы.

Вот изменения, которые необходимо внести в вашу программу для реализации этого предложения:

// Now we can declare this collections as simple Lists, not BlockingCollections.
private static List<Thread> CrawlerThreads = new List<Thread>();
private static List<String> CrawlerFailuresBlockingCollection = new List<String>();
private static List<String> CrawlerHitsBlockingCollection = new List<String>();
private static List<String> CrawlerMissesBlockingCollection = new List<String>();

// I found out that for my environment this values are optimal and give better performance.
private static long ThreadsCount = 200;
private static long ThreadRange = 50;

// Add this collection. Crawling threads will add http responses into it.
// And processing thread will process them.
private static BlockingCollection<HttpResponseMessage> ResponsesToProcess = new BlockingCollection<HttpResponseMessage>();

public static void Main(String[] args)
{
   ...
   // If we use 200 crawling threads then we should set
   // DefaultConnectionLimit=200 to make possible for all
   // crawling threads to make http responses simultaneously
   // without waiting for available connections.
   ServicePointManager.DefaultConnectionLimit = 200;
   ...
}

public static void InitiateCrawler(String BaseUrl, long ThreadsCount, long ThreadRange, long Offset = 0)
{
    Console.WriteLine("###############################");
    Console.WriteLine($"Commenced crawling the <{BaseUrl}> endpoint and working on <{ItemsToProcess}> items by creating <{ThreadsCount}> threads which each working on <{ThreadRange}> items.");

    while (CrawlerThreads.Count < ThreadsCount)
    {
        long Start = Offset + ThreadRange * CrawlerThreads.Count + 1;
        long End = Offset + ThreadRange * (CrawlerThreads.Count + 1);

        Thread CrawlerThread = new Thread(() => Crawl(Start, End));
        CrawlerThreads.Add(CrawlerThread);
        CrawlerThread.Start();
    }

    Task FinalizeCrawlerTask = Task.Run(() =>
    { 
        foreach (Thread CrawlerThread in CrawlerThreads) 
        { 
            CrawlerThread.Join(); 
        }
        // Notify processing thread that there is no more
        // http responses to process.
        ResponsesToProcess.CompleteAdding();
    });

    // Processing thread.
    Task ProcessResponsesThread = Task.Run(() =>
    {
        foreach (var HttpResponseMessage in ResponsesToProcess.GetConsumingEnumerable())
        {
            string CurrentUrl = HttpResponseMessage.RequestMessage.RequestUri.AbsoluteUri;

            if (HttpResponseMessage.IsSuccessStatusCode)
            {
                string CurrentPageContent = Encoding.UTF8.GetString(HttpResponseMessage.Content.ReadAsByteArrayAsync().Result);
                if (isResultRelevant(CurrentPageContent))
                {
                    HandleCrawlerRelevantResult(CurrentUrl, CurrentPageContent);
                }
                else
                {
                    HandleCrawlerIrrelevantResult(CurrentUrl, CurrentPageContent);
                }
            }
            else
            {
                HandleCrawlerFailure(CurrentUrl, HttpResponseMessage);
            }
        }
    });

    FinalizeCrawlerTask.Wait();
    ProcessResponsesThread.Wait();

    // Now we print the results of the program here because we must
    // ensure that finalizer and processing threads have finished.
    FinalizeCrawler();
}

public static void Crawl(long Start, long End)
{
    long Current = Start;
    while (Current <= End)
    {
        String CurrentUrlParamters = String.Format("{0}", Current);
        String CurrentUrl = $"{HttpClientCrawler.BaseAddress.AbsoluteUri}{CurrentUrlParamters}";
        String CurrentPageContent = "";
        HttpResponseMessage HttpResponseMessage = HttpClientCrawler.GetAsync(CurrentUrlParamters).Result;
        // Now crawling thread only adds http response into collection
        // of http responses to process. It doesn't process http responses.
        ResponsesToProcess.Add(HttpResponseMessage);
        Current++;
    }
}

public static void CrawlerResultantFileWriteLine(String Line)
{
    // Now we don't need a lock. We can delete it.
    using (StreamWriter StreamWriter = File.AppendText(String.Concat(AppDomain.CurrentDomain.BaseDirectory, "\\", CrawlerResultantFileName))) 
    {
        StreamWriter.WriteLine(Line); 
        StreamWriter.Close(); 
    } 
}

(Q3 .) Как определить оптимальную комбинацию ThreadRange и ThreadsCount в отношении производительности. Какой из них?

Я проверил ваш образец и обнаружил, что не существует практического правила, как выбрать ThreadsCount. Вот результаты тестов в моей среде (P C):

  • ThreadsCount = 20, ExecutionTime = 07m:40s
  • ThreadsCount = 25, ExecutionTime = 07m:00s
  • ThreadsCount = 50, ExecutionTime = 03m:30s
  • ThreadsCount = 100, ExecutionTime = 01m:30s
  • ThreadsCount = 200, ExecutionTime = 01m:10s
  • Дальнейшее увеличение ThreadsCount не улучшает ExecutionTime

Я думаю, что для нахождения оптимального значения ThreadsCount вы должны протестировать различные значения этого параметра в вашей среде и выберите лучшее.

Единственное, что я хочу выделить, это то, что важно установить ServicePointManager.DefaultConnectionLimit равным или большим, чем ThreadsCount. Если ServicePointManager.DefaultConnectionLimit меньше ThreadsCount, то некоторые потоки будут ожидать доступных подключений без выполнения полезной работы.

0 голосов
/ 17 апреля 2020

Это то, что я написал бы, чтобы очистить кучу запросов. Это достаточно эффективно. По сравнению с тем, что вы написали, я также думаю, что этот подход намного проще. Он использует Задачи для использования преимуществ параллелизма ЦП (в этом примере не ЦП привязан, он связан с отзывчивостью удаленного сервера), и вы можете вручную установить количество одновременных подключений для использования. (Я кодировал это с 6 , который является типичным числом соединений, которые современный веб-браузер разрешил бы к каждому домену.)

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;

namespace ScrapeExample
{
    class Program
    {
        private const int NumberOfSimultaneousConnections = 6;
        private const int MaxResponseContentBufferSize = 4096;
        private const string uriTemplate = "http://numbersapi.com/{0}";
        private const int FirstIndexToRequest = 1;
        private const int LastIndexToRequest = 100;

        static void Main(string[] args)
        {
            HttpClientHandler hch = new HttpClientHandler() { Proxy = null, UseProxy = false };
            HttpClient[] clients = Enumerable.Range(0, NumberOfSimultaneousConnections).Select(i =>
                new HttpClient(hch) { MaxResponseContentBufferSize = MaxResponseContentBufferSize }
            ).ToArray();

            List<Task<string>> tasks = new List<Task<string>>();
            for (int i = FirstIndexToRequest; i <= LastIndexToRequest; ++i) {
                string uri = string.Format(uriTemplate, i);
                tasks.Add(ProcessURLAsync(uri, clients[i % NumberOfSimultaneousConnections]));
            }
            Task.WaitAll(tasks.ToArray());
            string[] results = tasks.Select(t => t.Result).ToArray();
            Console.WriteLine(string.Join(Environment.NewLine, results));
        }

        private async static Task<string> ProcessURLAsync(string uri, HttpClient client)
        {
            HttpResponseMessage response = await client.GetAsync(uri);
            response.EnsureSuccessStatusCode();
            byte[] content = await response.Content.ReadAsByteArrayAsync();
            return Encoding.UTF8.GetString(content);
        }
    }
}

ВАЖНО:

На практике, Я обнаружил, что этот numbersapi.com легко перегрузится после нескольких сотен запросов и начнет возвращать 502 кода ошибки (Bad Gateway / Service временно перегружен). Таким образом, в этом дизайне отсутствует то, что в случае ошибки повторяет каждый элемент. Я просто написал EnsureSuccessStatusCode, который выдает исключение по ошибке. Но это уже превосходит удаленный API, который вы используете в качестве примера.

...