Я думаю, вы и I описывают одно и то же в комментариях. Тем не менее, нет необходимости создавать такой «специализированный IEnumerable
», потому что BlockingCollection<>
класс уже существует для такого сценария производитель-потребитель ios. Вы бы использовали его следующим образом ...
- Создать
BlockingCollection<>
для каждой функции-потребителя (т.е. tt1
и tt2
). - По умолчанию
BlockingCollection<>
оборачивает ConcurrentQueue<>
, поэтому элементы поступят в порядке FIFO. - Чтобы удовлетворить ваше требование, чтобы удерживался только один элемент в памяти одновременно,
1
будет указано для ограниченной емкости . Обратите внимание, что эта емкость указана для каждой коллекции, поэтому в двух коллекциях в любой момент времени может быть до двух элементов в очереди. - Каждая коллекция будет содержать элементы ввода для этого потребителя.
- Создать поток / задачу для каждой потребляющей функции.
- Поток / задача просто вызовет
GetConsumingEnumerator()
для своей коллекции входных данных, передаст полученный IEnumerable<>
своей функции-потребителю и вернет этот результат. GetConsumingEnumerable()
делает то, что следует из его названия: он создает IEnumerable<>
, который потребляет (удаляет) элементы из коллекции. Если коллекция пуста, перечисление будет блокироваться, пока элемент не будет добавлен. CompleteAdding()
вызывается после завершения работы производителя, что позволяет счетчику-потребителю завершать работу при опустошении коллекции.
- Производитель перечисляет
IEnumerable<>
, tt
и добавляет каждый элемент в обе коллекции. Это единственный раз, когда tt
перечисляется. BlockingCollection<>.Add()
будет заблокирован, если коллекция достигнет своей емкости, предотвращая буферизацию всего tt
в памяти.
- После полного перечисления
tt
для каждой коллекции вызывается CompleteAdding()
. - После завершения каждого потока / задачи потребителя возвращаются их результаты.
Вот что это выглядит как в коде ...
public static (S1, S2) Both<T, S1, S2>(this IEnumerable<T> tt, Func<IEnumerable<T>, S1> tt1, Func<IEnumerable<T>, S2> tt2)
{
const int MaxQueuedElementsPerCollection = 1;
using (BlockingCollection<T> collection1 = new BlockingCollection<T>(MaxQueuedElementsPerCollection))
using (Task<S1> task1 = StartConsumerTask(collection1, tt1))
using (BlockingCollection<T> collection2 = new BlockingCollection<T>(MaxQueuedElementsPerCollection))
using (Task<S2> task2 = StartConsumerTask(collection2, tt2))
{
foreach (T element in tt)
{
collection1.Add(element);
collection2.Add(element);
}
// Inform any enumerators created by .GetConsumingEnumerable()
// that there will be no more elements added.
collection1.CompleteAdding();
collection2.CompleteAdding();
// Accessing the Result property blocks until the Task<> is complete.
return (task1.Result, task2.Result);
}
Task<S> StartConsumerTask<S>(BlockingCollection<T> collection, Func<IEnumerable<T>, S> func)
{
return Task.Run(() => func(collection.GetConsumingEnumerable()));
}
}
Обратите внимание, что ради эффективности вы можете увеличить MaxQueuedElementsPerCollection
, скажем, до 10
или 100
, чтобы потребители не должны были работать в режиме блокировки друг с другом.
Существует одна проблема с этим кодом . Когда коллекция пуста, потребитель должен ждать, пока производитель произведет элемент, а когда коллекция заполнена, производитель должен ждать, пока потребитель потребит элемент. Подумайте, что происходит на полпути при выполнении вашей tt => tt.Any(k => k > 5)
лямбды ...
- Производитель ожидает, что коллекция не будет заполнена, и добавляет
5
. - Потребитель ждет, пока коллекция не будет пустой, и удаляет
5
. 5 > 5
возвращает false
и перечисление продолжается.
- Производитель ожидает, что коллекция не будет заполнена, и добавляет
6
. - Потребитель ожидает, пока коллекция не будет пустой, и удаляет
6
. 6 > 5
возвращает true
и перечисление останавливается. Any()
, лямбда и потребительская задача возвращаются.
- Производитель ожидает, что коллекция не будет заполнена, и добавляет
7
. - продюсер ожидает, что коллекция не будет полной и ... этого никогда не случится!
- Потребитель уже отказался от перечисления, поэтому он не потребляет никаких элементов, чтобы освободить место для нового.
Add()
никогда не вернется.
Самый простой способ, которым я мог бы придумать, чтобы предотвратить этот тупик, - это обеспечить перечисление всей коллекции, даже если func
не Сделай так. Это просто требует простого изменения StartConsumerTask<>()
локального метода ...
Task<S> StartConsumerTask<S>(BlockingCollection<T> collection, Func<IEnumerable<T>, S> func)
{
return Task.Run(
() => {
try
{
return func(collection.GetConsumingEnumerable());
}
finally
{
// Prevent BlockingCollection<>.Add() calls from
// deadlocking by ensuring the entire collection gets
// consumed even if func abandoned its enumeration early.
foreach (T element in collection.GetConsumingEnumerable())
{
// Do nothing...
}
}
}
);
}
Недостатком этого является то, что tt
всегда будет перечисляться до завершения, даже если оба tt1
и tt2
рано покидают свои счетчики.
С учетом этого, это ...
static void Main()
{
IEnumerable<int> mynums = Enumerable.Range(0, 10).Trace("mynums:");
Console.WriteLine("Both: (any > 5, sum) = {0}", mynums.Both(tt => tt.Any(k => k > 5), tt => tt.Sum()));
}
... выводит это ...
mynums: 0 1 2 3 4 5 6 7 8 9.
Both: (any > 5, sum) = (True, 45)