Я использую BlockingCollection
для реализации шаблона производитель-потребитель в C # 4.0.
BlockingCollection
содержит элементы, которые занимают довольно много памяти. Я бы хотел, чтобы продюсер одновременно брал один элемент из BlockingCollection и обрабатывал его.
Я думал, что при использовании foreach на BlockingCollection.GetConsumingEnumerable()
каждый раз BlockingCollection
удаляет элемент из базовой очереди (что означает все вместе со ссылкой), поэтому в конце метода Process (), который обрабатывает элемент, элемент может быть собран мусором.
Но это не так. Похоже, цикл foreach на BlockingCollection.GetConsumingEnumerable()
содержит все ссылки на элементы, введенные в очередь. Все предметы удерживаются (таким образом, предотвращается сбор мусора) до выхода из цикла foreach.
Вместо использования простого цикла foreach на BlockingCollection.GetConsumingEnumerable()
я использую флаг while, проверяющий флаг BlockingCollection.IsComplete, а внутри цикла я использую BlockingCollection.Take()
для захвата расходуемого элемента. Я бы предположил, что BlockingCollection.Take()
имеет эффект, аналогичный List.Remove()
, который удалит ссылку на элемент из BlockingCollection. Но опять же это неправильно. Все предметы являются только мусором, собранным вне цикла while.
Итак, мой вопрос: как мы можем легко реализовать требование, чтобы BlockingCollection потенциально содержал элементы, потребляющие память, и каждый элемент можно было собирать мусором после его потребления потребителем? Большое спасибо за любую помощь.
РЕДАКТИРОВАТЬ: по запросу добавлен простой демонстрационный код:
// Entity is what we are going to process.
// The finalizer will tell us when Entity is going to be garbage collected.
class Entity
{
private static int counter_;
private int id_;
public int ID { get{ return id_; } }
public Entity() { id_ = counter++; }
~Entity() { Console.WriteLine("Destroying entity {0}.", id_); }
}
...
private BlockingCollection<Entity> jobQueue_ = new BlockingCollection<Entity>();
private List<Task> tasks_ = new List<Task>();
// This is the method to launch and wait for the tasks to finish the work.
void Run()
{
tasks_.Add(Task.Factory.StartNew(ProduceEntity);
Console.WriteLine("Start processing.");
tasks_.Add(Task.Factory.StartNew(ConsumeEntity);
Task.WaitAll(tasks_.ToArray());
}
// The producer creates Entity instances and add them to BlockingCollection.
void ProduceEntity()
{
for(int i = 0; i < 10; i ++) // We are adding totally 10 entities.
{
var newEntity = new Entity();
Console.WriteLine("Create entity {0}.", newEntity.ID);
jobQueue_.Add(newEntity);
}
jobQueue_.CompleteAdding();
}
// The consumer takes entity, process it (and what I need: destroy it).
void ConsumeEntity()
{
while(!jobQueue_.IsCompleted){
Entity entity;
if(jobQueue_.TryTake(entity))
{
Console.WriteLine("Process entity {0}.", entity.ID);
entity = null;
// I would assume after GC, the entity will be finalized and garbage collected, but NOT.
GC.Collect();
GC.WaitForPendingFinalizers();
GC.Collect();
}
}
Console.WriteLine("Finish processing.");
}
Выводом является то, что все сообщения создания и обработки сопровождаются «Завершить обработку». и сопровождается всеми сообщениями об уничтожении от сущностей. И сообщение создания сущностей, показывающее Entity.ID от 0 до 9 и сообщения уничтожения, показывающие Entity.ID от 9 до 0.
EDIT:
Даже когда я устанавливаю ограниченную емкость BlockingCollection, все элементы, входящие в него, завершаются только при выходе из цикла, что странно.