Возвращая BlockingCollection как IEnumerable из метода - PullRequest
2 голосов
/ 22 февраля 2012

Я пытаюсь вернуть IEnumerable из метода, который поддерживается BlockingCollection.Шаблон кода:

public IEnumerable<T> Execute() {   
    var results = new BlockingCollection<T>(10);  
    _ExecuteLoad(results);   
    return results.GetConsumingEnumerable(); 
}

private void _ExecuteLoad<T>(BlockingCollection<T> results) {
    var loadTask = Task.Factory.StartNew(() =>
    { 
        //some async code that adds items to results
        results.CompleteAdding();
    });
}

public void Consumer() {
    var count = Execute().Count();
}

Проблема в том, что перечисляемое значение, возвращаемое функцией Execute (), всегда пусто.Все примеры, которые я видел, повторяют коллекцию BlockingCollection в задаче.Кажется, что в этой ситуации это не сработает.

Кто-нибудь знает, где я ошибаюсь?


Чтобы сделать вещи немного более понятными, я вставил код, который я выполняю, чтобы заполнить коллекцию.Возможно, здесь есть что-то, что вызывает проблему?

Task.Factory.StartNew(() =>
{
    var continuationRowKey = "";
    var continuationParitionKey = "";
    var action = HttpMethod.Get;
    var queryUri = _GetTableQueryUri(tableServiceUri, tableName, query, continuationParitionKey, continuationRowKey, timeout);
    while (true)
    {
        using (var request = GetRequest(queryUri, null, action.Method, azureAccountName, azureAccountKey))
        {
            request.Method = action;
            request.RequestUri = queryUri;

            using (var client = new HttpClient())
            {
                var sendTask = client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
                using (var response = sendTask.Result)
                {
                    continuationParitionKey = // stuff from headers
                    continuationRowKey = // stuff from headers

                    var streamTask = response.Content.ReadAsStreamAsync();
                    using (var stream = streamTask.Result)
                    {
                        using (var reader = XmlReader.Create(stream))
                        {
                            while (reader.Read())
                            {
                                if (reader.NodeType == XmlNodeType.Element && reader.Name == "entry" && reader.NamespaceURI == "http://www.w3.org/2005/Atom")
                                {
                                    results.Add(XNode.ReadFrom(reader) as XElement);
                                }
                            }
                            reader.Close();
                        }
                    }
                }
            }

            if (continuationParitionKey == null && continuationRowKey == null)
                break;

            queryUri = _GetTableQueryUri(tableServiceUri, tableName, query, continuationParitionKey, continuationRowKey, timeout);
        }
    }
    results.CompleteAdding();
});

1 Ответ

3 голосов
/ 22 февраля 2012

Вам нужно позвонить results.CompleteAdding(), когда вы закончите добавлять элементы в коллекцию.

Если вы этого не сделаете, перечисление никогда не закончится и Count() никогда не вернется.

Кроме того, указанный вами код верен.

...