BlockingCollection в Task Parallel Library не освобождает автоматически ссылку на базовые экземпляры - PullRequest
6 голосов
/ 21 октября 2010

Я использую BlockingCollection для реализации шаблона производитель-потребитель в C # 4.0.

BlockingCollection содержит элементы, которые занимают довольно много памяти. Я бы хотел, чтобы продюсер одновременно брал один элемент из BlockingCollection и обрабатывал его.

Я думал, что при использовании foreach на BlockingCollection.GetConsumingEnumerable() каждый раз BlockingCollection удаляет элемент из базовой очереди (что означает все вместе со ссылкой), поэтому в конце метода Process (), который обрабатывает элемент, элемент может быть собран мусором.

Но это не так. Похоже, цикл foreach на BlockingCollection.GetConsumingEnumerable() содержит все ссылки на элементы, введенные в очередь. Все предметы удерживаются (таким образом, предотвращается сбор мусора) до выхода из цикла foreach.

Вместо использования простого цикла foreach на BlockingCollection.GetConsumingEnumerable() я использую флаг while, проверяющий флаг BlockingCollection.IsComplete, а внутри цикла я использую BlockingCollection.Take() для захвата расходуемого элемента. Я бы предположил, что BlockingCollection.Take() имеет эффект, аналогичный List.Remove(), который удалит ссылку на элемент из BlockingCollection. Но опять же это неправильно. Все предметы являются только мусором, собранным вне цикла while.

Итак, мой вопрос: как мы можем легко реализовать требование, чтобы BlockingCollection потенциально содержал элементы, потребляющие память, и каждый элемент можно было собирать мусором после его потребления потребителем? Большое спасибо за любую помощь.

РЕДАКТИРОВАТЬ: по запросу добавлен простой демонстрационный код:

// Entity is what we are going to process.
// The finalizer will tell us when Entity is going to be garbage collected.
class Entity
{
    private static int counter_;
    private int id_;
    public int ID { get{ return id_; } }
    public Entity() { id_ = counter++; }
    ~Entity() { Console.WriteLine("Destroying entity {0}.", id_); }
}

...

private BlockingCollection<Entity> jobQueue_ = new BlockingCollection<Entity>();
private List<Task> tasks_ = new List<Task>();

// This is the method to launch and wait for the tasks to finish the work.
void Run()
{
    tasks_.Add(Task.Factory.StartNew(ProduceEntity);
    Console.WriteLine("Start processing.");
    tasks_.Add(Task.Factory.StartNew(ConsumeEntity);
    Task.WaitAll(tasks_.ToArray());
}

// The producer creates Entity instances and add them to BlockingCollection.
void ProduceEntity()
{
    for(int i = 0; i < 10; i ++) // We are adding totally 10 entities.
    {
        var newEntity = new Entity();
        Console.WriteLine("Create entity {0}.", newEntity.ID);
        jobQueue_.Add(newEntity);
    }
    jobQueue_.CompleteAdding();
}

// The consumer takes entity, process it (and what I need: destroy it).
void ConsumeEntity()
{
    while(!jobQueue_.IsCompleted){
        Entity entity;
        if(jobQueue_.TryTake(entity))
        {
            Console.WriteLine("Process entity {0}.", entity.ID);
            entity = null;

            // I would assume after GC, the entity will be finalized and garbage collected, but NOT.
            GC.Collect();
            GC.WaitForPendingFinalizers();
            GC.Collect();
        }
    }
    Console.WriteLine("Finish processing.");
}

Выводом является то, что все сообщения создания и обработки сопровождаются «Завершить обработку». и сопровождается всеми сообщениями об уничтожении от сущностей. И сообщение создания сущностей, показывающее Entity.ID от 0 до 9 и сообщения уничтожения, показывающие Entity.ID от 9 до 0.

EDIT:

Даже когда я устанавливаю ограниченную емкость BlockingCollection, все элементы, входящие в него, завершаются только при выходе из цикла, что странно.

Ответы [ 3 ]

6 голосов
/ 08 ноября 2010

ConcurrentQueue содержит сегменты с внутренним массивом из 32 элементов.Элементы сущности не будут собираться мусором до тех пор, пока сегмент не будет собран мусором.Это произойдет после того, как все 32 элемента будут удалены из очереди.Если вы измените свой пример, добавив 32 элемента, вы увидите сообщения «Уничтожение объекта» перед «Завершить обработку».

2 голосов
/ 21 октября 2010

Будет ли BlockingCollection продолжать хранить ссылки, зависит от типа коллекции, которую он использует.

Тип коллекции по умолчанию для BlockingCollection<T> равен ConcurrentQueue<T>.

Таким образом, поведение сборки мусора будет зависеть от типа сбора. В случае ConcurrentQueue<T> это структура FIFO, поэтому я был бы очень удивлен, если бы это не освободило ссылки из структуры данных после того, как они были удалены из очереди (это своего рода определение очереди)!

Как именно вы определяете, что объекты не собираются мусором?

...