Я попал в ситуацию, когда мне нужно исследовать относительно огромный набор данных через заблокированный вывод, который представляет собой одиночную запись на основе идентификатора, возвращающую конечную точку. Повторение этой конечной точки для последовательной очистки всех данных, очевидно, не является практическим вариантом, поэтому я попытался создать распараллеленные множественные процессы, каждый из которых имеет свой собственный диапазон идентификаторов для работы. Вот что мне удалось сделать, что до сих пор не работает идеально. «См. Вопросы внизу»
Предисловие к коду ↴
В этой демонстрации я использовал http://numbersapi.com/. Просто вы вводите число, и API возвращает случайный факт о нем из фактов, связанных с ним. Если есть какие-либо другие значения, он возвращает рандомизированную одну из 4 фраз, содержащих число без фактов.
Примеры API:
Число, которое имеет хотя бы один факт → http://numbersapi.com/1
Число, с которым не связаны факты → http://numbersapi.com/123456789
Приложение
ThreadRange : элементы, обрабатываемые одним потоком
Код ↴
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Security.Authentication;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
namespace CSharp
{
class Program
{
private static Stopwatch CrawlerStopWatch = new Stopwatch();
private static HttpClient HttpClientCrawler = null;
private static HttpClientHandler HttpClientCrawlerHttpsHandler = new HttpClientHandler() { SslProtocols = SslProtocols.Tls12 | SslProtocols.Tls11 | SslProtocols.Tls };
private static BlockingCollection<Thread> CrawlerThreads = new BlockingCollection<Thread>();
private static BlockingCollection<String> CrawlerFailuresBlockingCollection = new BlockingCollection<String>();
private static BlockingCollection<String> CrawlerHitsBlockingCollection = new BlockingCollection<String>();
private static BlockingCollection<String> CrawlerMissesBlockingCollection = new BlockingCollection<String>();
private static String BaseUrl = "http://numbersapi.com/";
private static String FullUrl = String.Concat(BaseUrl, "{0}");
private static long ThreadsCount = 1;
private static long ThreadRange = 10;
private static long Offset = 2;
private static long ItemsToProcess = ThreadsCount * ThreadRange;
private static int MaxUrlLength = String.Format(FullUrl, ItemsToProcess).Length;
private static ReaderWriterLockSlim CrawlerReaderWriterLockSlim = new ReaderWriterLockSlim();
private static String CrawlerResultantFileName = "z.CrawlerResult.txt";
public static void Main(String[] args)
{
CrawlerStopWatch.Start();
//### Managing HttpClient ###/
ServicePointManager.DefaultConnectionLimit = 50;
if (Regex.IsMatch(BaseUrl, @"^https.*$")) { HttpClientCrawler = new HttpClient(HttpClientCrawlerHttpsHandler); } else { HttpClientCrawler = new HttpClient(); }
HttpClientCrawler.BaseAddress = new Uri(BaseUrl);
HttpClientCrawler.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/html"));
//### Managing HttpClient ###/
InitiateCrawler(BaseUrl, ThreadsCount, ThreadRange, Offset);
}
//### Crawler Methods ###
public static void InitiateCrawler(String BaseUrl, long ThreadsCount, long ThreadRange, long Offset = 0)
{
Console.WriteLine("###############################");
Console.WriteLine($"Commenced crawling the <{BaseUrl}> endpoint and working on <{ItemsToProcess}> items by creating <{ThreadsCount}> threads which each working on <{ThreadRange}> items.");
while (CrawlerThreads.Count < ThreadsCount)
{
long Start = Offset + ThreadRange * CrawlerThreads.Count + 1;
long End = Offset + ThreadRange * (CrawlerThreads.Count + 1);
Thread CrawlerThread = new Thread(() => Crawl(Start, End));
CrawlerThreads.Add(CrawlerThread);
CrawlerThread.Start();
}
Task FinalizeCrawlerTask = Task.Run(() => { foreach (Thread CrawlerThread in CrawlerThreads) { CrawlerThread.Join(); } FinalizeCrawler(); });
FinalizeCrawlerTask.Wait();
}
public static void Crawl(long Start, long End)
{
long Current = Start;
while (Current <= End)
{
String CurrentUrlParamters = String.Format("{0}", Current);
String CurrentUrl = $"{HttpClientCrawler.BaseAddress.AbsoluteUri}{CurrentUrlParamters}";
String CurrentPageContent = "";
HttpResponseMessage HttpResponseMessage = HttpClientCrawler.GetAsync(CurrentUrlParamters).Result;
if (HttpResponseMessage.IsSuccessStatusCode)
{
CurrentPageContent = Encoding.UTF8.GetString(HttpResponseMessage.Content.ReadAsByteArrayAsync().Result);
if (isResultRelevant(CurrentPageContent)) { HandleCrawlerRelevantResult(CurrentUrl, CurrentPageContent); } else { HandleCrawlerIrrelevantResult(CurrentUrl, CurrentPageContent); }
}
else
{
HandleCrawlerFailure(CurrentUrl, HttpResponseMessage);
}
Current++;
}
}
public static void HandleCrawlerFailure(String Url, HttpResponseMessage HttpResponseMessage)
{
CrawlerFailuresBlockingCollection.Add(Url);
int ProcessedItems = CrawlerHitsBlockingCollection.Count + CrawlerMissesBlockingCollection.Count + CrawlerFailuresBlockingCollection.Count;
Console.WriteLine($"[Item #{ProcessedItems.ToString().PadRight(ItemsToProcess.ToString().Length)}] {Url.PadRight(MaxUrlLength)} returned {(int)HttpResponseMessage.StatusCode} Code | {HttpResponseMessage.ReasonPhrase}");
}
public static Boolean isResultRelevant(String Content)
{
Boolean IsRelevant = true;
String[] RegularExpressionsArray = new string[]
{
@"^[\d]+ is a boring number\.$",
@"^[\d]+ is an uninteresting number\.$",
@"^[\d]+ is an unremarkable number\.$",
@"^[\d]+ is a number for which we're missing a fact (submit one to numbersapi at google mail!)\.$",
};
foreach (String RegularExpression in RegularExpressionsArray) { if (Regex.IsMatch(Content, RegularExpression)) { IsRelevant = false; break; } }
return IsRelevant;
}
public static void HandleCrawlerRelevantResult(String Url, String Content)
{
CrawlerResultantFileWriteLine(Url);
CrawlerHitsBlockingCollection.Add(Url);
int ProcessedItems = CrawlerHitsBlockingCollection.Count + CrawlerMissesBlockingCollection.Count + CrawlerFailuresBlockingCollection.Count;
Console.WriteLine($"[Item #{ProcessedItems.ToString().PadRight(ItemsToProcess.ToString().Length)}] {Url.PadRight(MaxUrlLength)} is relevant");
}
public static void HandleCrawlerIrrelevantResult(String Url, String Content)
{
CrawlerMissesBlockingCollection.Add(Url);
int ProcessedItems = CrawlerHitsBlockingCollection.Count + CrawlerMissesBlockingCollection.Count + CrawlerFailuresBlockingCollection.Count;
Console.WriteLine($"[Item #{ProcessedItems.ToString().PadRight(ItemsToProcess.ToString().Length)}] {Url.PadRight(MaxUrlLength)} is irrelevant");
}
public static void FinalizeCrawler()
{
CrawlerStopWatch.Stop();
TimeSpan TimeSpan = TimeSpan.FromMilliseconds(CrawlerStopWatch.ElapsedMilliseconds);
String TimeLapseInformation = String.Format("{0:D2}h:{1:D2}m:{2:D2}s:{3:D3}ms",
TimeSpan.Hours,
TimeSpan.Minutes,
TimeSpan.Seconds,
TimeSpan.Milliseconds);
Console.WriteLine($"Crawling finished in {TimeLapseInformation}.");
Console.WriteLine($"<{CrawlerFailuresBlockingCollection.Count + CrawlerHitsBlockingCollection.Count + CrawlerMissesBlockingCollection.Count}> out of <{ItemsToProcess}> items have been crawled having <{CrawlerHitsBlockingCollection.Count}> relevant items, <{CrawlerMissesBlockingCollection.Count}> irrelevant items and <{CrawlerFailuresBlockingCollection.Count}> failures.");
Console.WriteLine("###############################");
}
//### Crawler Methods ###
//### Auxiliary Methods ###
public static void CrawlerResultantFileWriteLine(String Line)
{
CrawlerReaderWriterLockSlim.EnterWriteLock();
try { using (StreamWriter StreamWriter = File.AppendText(String.Concat(AppDomain.CurrentDomain.BaseDirectory, "\\", CrawlerResultantFileName))) { StreamWriter.WriteLine(Line); StreamWriter.Close(); } }
finally { CrawlerReaderWriterLockSlim.ExitWriteLock(); }
}
//### Auxiliary Methods ###
}
}
Вопросы
(Q1.) Является ли представленный подход оптимальным?
(Q2.) В функции FinalizeCrawler (), если количество коллективных элементов для обработки во всех потоках слишком велико, количество обработанных элементов не равно назначенному количеству элементов для обработки в InitializeCrawler Вызов (UrlHandle, ThreadsCount, ThreadRange) в main как:
(
CrawlerFailuresBlockingCollection.Count +
CrawlerHits.Count +
CrawlerMisses.Count ) должен быть равен (
ThreadsCount *
ThreadRange )
Некоторые элементы пропущены? если да, то почему?
(A2.) Как указал
Джошуа Робинсон , я использовал System.Collections.Generi c .Список небезопасной коллекции для изменения из нескольких потоков. Я переключился на System.Collections.Concurrent.BlockingCollection, протестировал его с большим числом, таким как 10000, и он заработал.
(Q3.) Как точно определить оптимальный
ThreadRange и
ThreadsCount комбинация в отношении производительности. Какой из них?
- (A) Минимальное количество потоков с максимальным диапазоном потоков
- (B) Низкое количество потоков с высоким диапазоном потоков
- (C) Средний диапазон потоков с количеством медианных потоков
- (B) High ThreadsCount с низким ThreadRange
- (E) Макс. ThreadsCount с min. ThreadRange
Если быть более сложным, если я хочу обработать 10000 элементов с этой конечной точки, это все возможные комбинации, удовлетворяющие необходимому предикату
ThreadsCount * ThreadRange = 10000 , что я выбрал только из них.
Что является наиболее оптимальным? И почему?
<1> ThreadsCount * <10000> ThreadRange
<2> ThreadsCount * <5000> ThreadRange
<4> ThreadsCount * <2500> ThreadRange
<5> ThreadsCount * <2000> ThreadRange
<8> ThreadsCount * <1250> ThreadRange
<10> ThreadsCount * <1000> ThreadRange
<16> ThreadsCount * <625> ThreadRange
<20> ThreadsCount * <500> ThreadRange
<25> ThreadsCount * <400> ThreadRange
<40> ThreadsCount * <250> ThreadRange
<50> ThreadsCount * <200> ThreadRange
<80> ThreadsCount * <125> ThreadRange
<100> ThreadsCount * <100> ThreadRange // Exact Median → square root of items to process if integer
<125> ThreadsCount * <80> ThreadRange
<200> ThreadsCount * <50> ThreadRange
<250> ThreadsCount * <40> ThreadRange
<400> ThreadsCount * <25> ThreadRange
<500> ThreadsCount * <20> ThreadRange
<625> ThreadsCount * <16> ThreadRange
<1000> ThreadsCount * <10> ThreadRange
<1250> ThreadsCount * <8> ThreadRange
<2000> ThreadsCount * <5> ThreadRange
<2500> ThreadsCount * <4> ThreadRange
<5000> ThreadsCount * <2> ThreadRange
<10000> ThreadsCount * <1> ThreadRange