Загрузка HTML-страниц одновременно с использованием Async CTP - PullRequest
3 голосов
/ 13 февраля 2012

Попытка написать сканер HTML с использованием Async CTP Я застрял в том, как написать безрекурсный метод для этого.

Это код, который у меня пока есть.

private readonly ConcurrentStack<LinkItem> _LinkStack;
private readonly Int32 _MaxStackSize;
private readonly WebClient client = new WebClient();

Func<string, string, Task<List<LinkItem>>> DownloadFromLink = async (BaseURL, uri) => 
{
    string html = await client.DownloadStringTaskAsync(uri);
    return LinkFinder.Find(html, BaseURL);
};

Action<LinkItem> DownloadAndPush = async (o) => 
{
    List<LinkItem> result = await DownloadFromLink(o.BaseURL, o.Href);
    if (this._LinkStack.Count() + result.Count <= this._MaxStackSize)
    {
        this._LinkStack.PushRange(result.ToArray());
        o.Processed = true;
    }  
};

Parallel.ForEach(this._LinkStack, (o) => 
{
    DownloadAndPush(o);
});

Но, очевидно, это не работает, как я надеюсь, потому что в то время, когда Parallel.ForEach выполняет первую (и единственную итерацию), у меня есть только 1 элемент.Самый простой подход, который я могу придумать, - сделать ForEach рекурсивным, но я не могу (я не думаю) сделать это, так как я бы быстро исчерпал пространство стека.как я могу реструктурировать этот код, чтобы создать то, что я бы описал как рекурсивное продолжение, которое добавляет элементы до тех пор, пока не будет достигнут MaxStackSize или система не исчерпает память?

1 Ответ

10 голосов
/ 13 февраля 2012

Я думаю, что лучший способ сделать что-то подобное с использованием C # 5 / .Net 4.5 - это использовать TPL Dataflow . Существует даже пошаговое руководство по реализации веб-сканера с его использованием .

По сути, вы создаете один «блок», который заботится о загрузке одного URL и получении ссылки с него:

var cts = new CancellationTokenSource();

Func<LinkItem, Task<IEnumerable<LinkItem>>> downloadFromLink =
    async link =>
            {
                // WebClient is not guaranteed to be thread-safe,
                // so we shouldn't use one shared instance
                var client = new WebClient();
                string html = await client.DownloadStringTaskAsync(link.Href);

                return LinkFinder.Find(html, link.BaseURL);
            };

var linkFinderBlock = new TransformManyBlock<LinkItem, LinkItem>(
    downloadFromLink,
    new ExecutionDataflowBlockOptions
    { MaxDegreeOfParallelism = 4, CancellationToken = cts.Token });

Вы можете установить MaxDegreeOfParallelism на любое значение, которое вы хотите. Максимальное количество одновременных загрузок URL-адресов. Если вы вообще не хотите ограничивать его, вы можете установить его на DataflowBlockOptions.Unbounded.

Затем вы создаете один блок, который каким-то образом обрабатывает все загруженные ссылки, например, хранит их все в списке. Он также может решить, когда отменить загрузку:

var links = new List<LinkItem>();

var storeBlock = new ActionBlock<LinkItem>(
    linkItem =>
    {
        links.Add(linkItem);
        if (links.Count == maxSize)
            cts.Cancel();
    });

Поскольку мы не установили MaxDegreeOfParallelism, по умолчанию он равен 1. Это означает, что использование коллекции, которая не является поточно-ориентированной, должно быть в порядке здесь.

Мы создаем еще один блок: он возьмет ссылку с linkFinderBlock и передаст ее как storeBlock, так и обратно linkFinderBlock.

var broadcastBlock = new BroadcastBlock<LinkItem>(li => li);

Лямбда в своем конструкторе является «функцией клонирования». Вы можете использовать его для создания клона предмета, если хотите, но в этом нет необходимости, поскольку мы не изменяем LinkItem после создания.

Теперь мы можем соединить блоки вместе:

linkFinderBlock.LinkTo(broadcastBlock);
broadcastBlock.LinkTo(storeBlock);
broadcastBlock.LinkTo(linkFinderBlock);

Затем мы можем начать обработку, передав первый элемент linkFinderBlock (или broadcastBlock, если вы также хотите отправить его по storeBlock):

linkFinderBlock.Post(firstItem);

И, наконец, дождитесь завершения обработки:

try
{
    linkFinderBlock.Completion.Wait();
}
catch (AggregateException ex)
{
    if (!(ex.InnerException is TaskCanceledException))
        throw;
}
...