Поток данных TPL для WebCrawler - PullRequest
       151

Поток данных TPL для WebCrawler

1 голос
/ 07 августа 2020

Я хочу создать веб-сканер, который будет загружать страницу, расположенную по некоторому URL-адресу, искать некоторые элементы, а затем создавать для нее результат, который будет готов для сохранения в БД. Но я хочу, чтобы эта часть БД сохранялась партиями.

Последняя часть состоит в том, что немного усложняет все это упражнение (по крайней мере, для моего текущего понимания потока данных TPL, история которого составляет 1 день;) ) Я знаю, что есть элемент BatchBlock, но сценарий, в котором я его видел, был простым, где это был первый шаг и "пакетирование" ввода, указанного в приложении (а не работы внутреннего конвейера). И я попытался поставить где-то внутри конвейера эта часть пакетной обработки, но я либо вынужден передать список URL-адресов на первый шаг (и тогда этап загрузки URL-адресов будет одним шагом, а другие шаги будут ждать, пока он не будет завершен), либо я могу передать один URL-адрес в конвейер, но затем отмечается пакетная обработка, поскольку из 1 URL-адреса есть один элемент синтаксического анализа для сохранения в БД :)

Это то, чего я хочу достичь:

enter image description here

What is important of course, that each download url is "independant" from other "download url" action. So once some page is downloaded it can instantly go to the webscrapping part. At once this is ready, it can instantly go to the phase of saving in DB (so waiting till batch of x elements comes - for example - 5) and then save it to DB.

Of course, I don't have to mention, that both "Download url" and "Webscrap neccessary data" transformation are async operations.

Maybe this is not something you can solve with TPL Dataflow? Please advice :)

[UPDATE - 07.08.2020 13:25]

Ok, yesterday I made a false assumption, that I post only one thing in the pipeline as the signature takes one string. That was clearly wrong assumption as I can just call it several times :)

I have more or less working examples, but two things are missing. Changing it to async and how to flush BatchBlock. Because if I have BatchBlock of size 3 and I send it to pipeline 8 URLs, I get a response only from the first 6.

Another issue with this example is .... that even without the need to flush (so i am sending 9 URLs and BatchBlock is 3) still the program runs indefinitely. Where is the issue?

Console.WriteLine($"Processing started: {DateTime.Now.ToString()}");
var workBuffer = new BatchBlock(3);
var downloadUrl = new TransformBlock(url =>
{
    Thread.Sleep(int.Parse(url.Last().ToString()) * 1000);
    return url;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

var parseContent = new TransformBlock(content =>
{
    Thread.Sleep(int.Parse(content.Last().ToString()) * 1000 / 2);
    return $"parsing result for: {content}";
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

var saveToDb = new TransformBlock(results =>
{
    Console.WriteLine($"results: {DateTime.Now.ToString()} {String.Join(", ", results)}");
    return true;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

downloadUrl.LinkTo(parseContent, new DataflowLinkOptions
{
    PropagateCompletion = true
});
parseContent.LinkTo(workBuffer, new DataflowLinkOptions
{
    PropagateCompletion = true
});
workBuffer.LinkTo(saveToDb, new DataflowLinkOptions
{
    PropagateCompletion = true
});

downloadUrl.Completion.ContinueWith(obj => parseContent.Complete());
parseContent.Completion.ContinueWith(obj => workBuffer.Complete());
workBuffer.Completion.ContinueWith(obj => saveToDb.Complete());

//last digit in string is treated as url download time (in seconds) and half of it is for processing time.  
downloadUrl.Post("http://some_site_to_parse.com2"); //downoading for this url is 2 sec, processing 1 sec. It will be ready to save to DB after 3 sec
downloadUrl.Post("http://some_site_to_parse.com3"); //downoading for this url is 3 sec, processing 1,5 sec. It will be ready to save to DB after 4,5 sec
downloadUrl.Post("http://some_site_to_parse.com4"); //downoading for this url is 4 sec, processing 2 sec. It will be ready to save to DB after 6 sec
//here should first batch be saved to DB after 6 seconds
downloadUrl.Post("http://some_site_to_parse.com5"); //downoading for this url is 5 sec, processing 2,5 sec. It will be ready to save to DB after 7,5 sec
downloadUrl.Post("http://some_site_to_parse.com6"); //downoading for this url is 6 sec, processing 3 sec. It will be ready to save to DB after 9 sec
downloadUrl.Post("http://some_site_to_parse.com7"); //downoading for this url is 7 sec, processing 3,5 sec. It will be ready to save to DB after 10,5 sec
//here should second batch be saved to DB after 10,5 seconds
downloadUrl.Post("http://some_site_to_parse.com8"); //downoading for this url is 8 sec, processing 4 sec. It will be ready to save to DB after 12 sec
downloadUrl.Post("http://some_site_to_parse.com9"); //downoading for this url is 9 sec, processing 4,5 sec. It will be ready to save to DB after 13,5 sec
downloadUrl.Post("http://some_site_to_parse.com10"); //downoading for this url is 10 sec, processing 5 sec. It will be ready to save to DB after 15 sec
//here should third batch be saved to DB after 15 seconds

downloadUrl.Complete();
saveToDb.Completion.Wait();

To summarize three questions:

  1. How to flush BatchBlock
  2. Why is this example app running indefinitely
  3. How to make it Async

[UPDATE 2 - 07.08.2020 14:28]

Somebody suggested that this is the solution to my problem: TPL Dataflow Transform block post to batch block, за которым следует блок действий

Но я добавил все , new DataflowLinkOptions { PropagateCompletion = true } и добавил workBuffer.Completion.ContinueWith(obj => saveToDb.Complete()); и он все еще не работает

Ответы [ 2 ]

2 голосов
/ 07 августа 2020

Я бы посоветовал вам взглянуть на Microsoft Reactive Framework (также известный как Rx), поскольку он делает такую ​​обработку очень простой.

Если я могу предположить, что у вас есть List<string> urls, и у вас есть следующие методы:

Task<string> DownloadUrlAsync(string url)
Task<string> WebscrapeAsync(string content)
Task SaveDataToDBAsync(IList<string> data)

... тогда вы можете сделать это с помощью Rx:

int buffer_size = 50;
IObservable<Unit> query =
    urls
        .ToObservable()
        .SelectMany(url => Observable.FromAsync(() => DownloadUrlAsync(url)))
        .SelectMany(content => Observable.FromAsync(() => WebscrapeAsync(content)))
        .Buffer(buffer_size)
        .SelectMany(buffer => Observable.FromAsync(() => SaveDataToDBAsync(buffer)));
        
IDisposable subscription = query.Subscribe();

Этот запрос обрабатывает все асинхронные c вызовы с использованием нескольких потоков, буферизуя содержимое и сохранение в базу данных.

Метод .Subscribe также имеет обратные вызовы для обработки значений по мере их создания, любого исключения и / или завершения.

Вам необходимо NuGet System.Reactive и добавьте using System.Reactive.Linq;, чтобы получить биты.

2 голосов
/ 07 августа 2020

Я думаю, это то, что вы пытаетесь сделать ...

Сначала создайте клиента, который будет использоваться всеми:

private static readonly HttpClient _client = new HttpClient(new HttpClientHandler
{
    AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate
});

Затем вот как я построил блоки и связал их:

const int maxDegreeOfParalleism = 10;

// first in, first out buffer block
var uriInputBlock = new BufferBlock<Uri>();

// transform block will download the data to string
var downloadHttpDataBlock = new TransformBlock<Uri, string>(async uri =>
{
    using(var msg = new HttpRequestMessage(HttpMethod.Get, uri))
    using(var resp = await _client.SendAsync(msg, HttpCompletionOption.ResponseHeadersRead))
    {
        return await resp.Content.ReadAsStringAsync().ConfigureAwait(false);
    }
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxDegreeOfParalleism });

// this block will take the data and scrape what it wants
var htmlScrapeBlock = new TransformBlock<string, string[]>(data =>
{
    var doc = new HtmlAgilityPack.HtmlDocument();
    doc.LoadHtml(data);
    return doc.DocumentNode.SelectNodes("//a[@href]").
        Select(x => x.GetAttributeValue("href", string.Empty)).ToArray();
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxDegreeOfParalleism });

// take in arrays and send them out as single elements
var manyToOneBlock = new TransformManyBlock<string[], string>(x => x);

// output data to a batch block with grouping of 10
var outputDataBlcok = new BatchBlock<string>(10);

// final block to store it somewhere
var databaseBlock = new ActionBlock<string[]>(x =>
{
    Console.WriteLine($"Group of {x.Length} items to be processed:");
    foreach (var uri in x)
    {
        Console.WriteLine($"Store this: {uri}");
    }
});

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
uriInputBlock.LinkTo(downloadHttpDataBlock, linkOptions);
downloadHttpDataBlock.LinkTo(htmlScrapeBlock, linkOptions);
htmlScrapeBlock.LinkTo(manyToOneBlock, linkOptions);
manyToOneBlock.LinkTo(outputDataBlcok, linkOptions);
outputDataBlcok.LinkTo(databaseBlock, linkOptions);

uriInputBlock.Post(new Uri("https://stackoverflow.com"));
uriInputBlock.Post(new Uri("https://google.com"));
uriInputBlock.Post(new Uri("https://yahoo.com"));
uriInputBlock.Post(new Uri("https://example.com"));

// When you want to complete/close down the pipeline, call this
uriInputBlock.Complete();
// you can wait for all data to finish propogating by calling this:
databaseBlock.Completion.Wait();

Это просто базовая c концепция, очевидно, вы можете сделать это намного лучше, но это должно помочь вам начать работу. Подробнее о множестве разных блоков здесь .

...