Я хочу создать веб-сканер, который будет загружать страницу, расположенную по некоторому URL-адресу, искать некоторые элементы, а затем создавать для нее результат, который будет готов для сохранения в БД. Но я хочу, чтобы эта часть БД сохранялась партиями.
Последняя часть состоит в том, что немного усложняет все это упражнение (по крайней мере, для моего текущего понимания потока данных TPL, история которого составляет 1 день;) ) Я знаю, что есть элемент BatchBlock, но сценарий, в котором я его видел, был простым, где это был первый шаг и "пакетирование" ввода, указанного в приложении (а не работы внутреннего конвейера). И я попытался поставить где-то внутри конвейера эта часть пакетной обработки, но я либо вынужден передать список URL-адресов на первый шаг (и тогда этап загрузки URL-адресов будет одним шагом, а другие шаги будут ждать, пока он не будет завершен), либо я могу передать один URL-адрес в конвейер, но затем отмечается пакетная обработка, поскольку из 1 URL-адреса есть один элемент синтаксического анализа для сохранения в БД :)
Это то, чего я хочу достичь:
![enter image description here](https://i.stack.imgur.com/oLdDY.png)
What is important of course, that each download url is "independant" from other "download url" action. So once some page is downloaded it can instantly go to the webscrapping part. At once this is ready, it can instantly go to the phase of saving in DB (so waiting till batch of x elements comes - for example - 5) and then save it to DB.
Of course, I don't have to mention, that both "Download url" and "Webscrap neccessary data" transformation are async operations.
Maybe this is not something you can solve with TPL Dataflow? Please advice :)
[UPDATE - 07.08.2020 13:25]
Ok, yesterday I made a false assumption, that I post only one thing in the pipeline as the signature takes one string. That was clearly wrong assumption as I can just call it several times :)
I have more or less working examples, but two things are missing. Changing it to async and how to flush BatchBlock. Because if I have BatchBlock of size 3 and I send it to pipeline 8 URLs, I get a response only from the first 6.
Another issue with this example is .... that even without the need to flush (so i am sending 9 URLs and BatchBlock is 3) still the program runs indefinitely. Where is the issue?
Console.WriteLine($"Processing started: {DateTime.Now.ToString()}");
var workBuffer = new BatchBlock(3);
var downloadUrl = new TransformBlock(url =>
{
Thread.Sleep(int.Parse(url.Last().ToString()) * 1000);
return url;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
var parseContent = new TransformBlock(content =>
{
Thread.Sleep(int.Parse(content.Last().ToString()) * 1000 / 2);
return $"parsing result for: {content}";
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
var saveToDb = new TransformBlock(results =>
{
Console.WriteLine($"results: {DateTime.Now.ToString()} {String.Join(", ", results)}");
return true;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
downloadUrl.LinkTo(parseContent, new DataflowLinkOptions
{
PropagateCompletion = true
});
parseContent.LinkTo(workBuffer, new DataflowLinkOptions
{
PropagateCompletion = true
});
workBuffer.LinkTo(saveToDb, new DataflowLinkOptions
{
PropagateCompletion = true
});
downloadUrl.Completion.ContinueWith(obj => parseContent.Complete());
parseContent.Completion.ContinueWith(obj => workBuffer.Complete());
workBuffer.Completion.ContinueWith(obj => saveToDb.Complete());
//last digit in string is treated as url download time (in seconds) and half of it is for processing time.
downloadUrl.Post("http://some_site_to_parse.com2"); //downoading for this url is 2 sec, processing 1 sec. It will be ready to save to DB after 3 sec
downloadUrl.Post("http://some_site_to_parse.com3"); //downoading for this url is 3 sec, processing 1,5 sec. It will be ready to save to DB after 4,5 sec
downloadUrl.Post("http://some_site_to_parse.com4"); //downoading for this url is 4 sec, processing 2 sec. It will be ready to save to DB after 6 sec
//here should first batch be saved to DB after 6 seconds
downloadUrl.Post("http://some_site_to_parse.com5"); //downoading for this url is 5 sec, processing 2,5 sec. It will be ready to save to DB after 7,5 sec
downloadUrl.Post("http://some_site_to_parse.com6"); //downoading for this url is 6 sec, processing 3 sec. It will be ready to save to DB after 9 sec
downloadUrl.Post("http://some_site_to_parse.com7"); //downoading for this url is 7 sec, processing 3,5 sec. It will be ready to save to DB after 10,5 sec
//here should second batch be saved to DB after 10,5 seconds
downloadUrl.Post("http://some_site_to_parse.com8"); //downoading for this url is 8 sec, processing 4 sec. It will be ready to save to DB after 12 sec
downloadUrl.Post("http://some_site_to_parse.com9"); //downoading for this url is 9 sec, processing 4,5 sec. It will be ready to save to DB after 13,5 sec
downloadUrl.Post("http://some_site_to_parse.com10"); //downoading for this url is 10 sec, processing 5 sec. It will be ready to save to DB after 15 sec
//here should third batch be saved to DB after 15 seconds
downloadUrl.Complete();
saveToDb.Completion.Wait();
To summarize three questions:
- How to flush BatchBlock
- Why is this example app running indefinitely
- How to make it Async
[UPDATE 2 - 07.08.2020 14:28]
Somebody suggested that this is the solution to my problem: TPL Dataflow Transform block post to batch block, за которым следует блок действий
Но я добавил все , new DataflowLinkOptions { PropagateCompletion = true }
и добавил workBuffer.Completion.ContinueWith(obj => saveToDb.Complete());
и он все еще не работает