Использование IEnumerable несколько раз за один проход - PullRequest
0 голосов
/ 01 апреля 2020

Можно ли написать функцию более высокого порядка, которая вызывает многократное использование IEnumerable, но только за один проход и без чтения всех данных в память? [ См. Редактирование ниже для разъяснения того, что я ищу. ]

Например, в приведенном ниже коде перечисляемое значение равно mynums (на котором я пометил .Trace(), чтобы увидеть, сколько раз мы его перечисляем). Цель состоит в том, чтобы выяснить, имеет ли оно какие-либо числа больше 5, а также сумму всех чисел. Функция, которая обрабатывает перечислимое дважды, равна Both_TwoPass, но она перечисляет его дважды. В отличие от этого Both_NonStream перечисляет его только один раз, но за счет чтения в память. В принципе можно выполнить обе эти задачи за один проход и в потоковом режиме, как показано Any5Sum, но это конкретное решение c. Можно ли написать функцию с такой же сигнатурой, как у Both_*, но это лучшее из обоих миров?

(Мне кажется, это должно быть возможно с помощью потоков. Есть ли лучшее решение, использующее, скажем, async?)

Редактировать

Ниже приведено пояснение относительно того, что я ищу. Я включил очень простое описание каждого свойства в квадратных скобках.

Я ищу функцию Both со следующими характеристиками:

  1. Имеет сигнатуру (S1, S2) Both<T, S1, S2>(this IEnumerable<T> tt, Func<IEnumerable<T>, S1>, Func<IEnumerable<T>, S2>) (и выдает "правильный" вывод!)
  2. Он повторяет только первый аргумент, tt, один раз. [Под этим я подразумеваю, что при пропуске mynums (как определено ниже) он выдает только mynums: 0 1 2 ... один раз . Это исключает функцию Both_TwoPass.]
  3. . Она обрабатывает данные из первого аргумента tt в потоковом режиме. [Под этим я подразумеваю, что, например, недостаточно памяти для хранения всех элементов из tt в памяти одновременно, что исключает функцию Both_NonStream.]
using System;
using System.Collections.Generic;
using System.Linq;

namespace ConsoleApp
{
    static class Extensions
    {
        public static IEnumerable<T> Trace<T>(this IEnumerable<T> tt, string msg = "")
        {
            Console.Write(msg);
            try
            {
                foreach (T t in tt)
                {
                    Console.Write(" {0}", t);
                    yield return t;
                }
            }
            finally
            {
                Console.WriteLine('.');
            }
        }

        public static (S1, S2) Both_TwoPass<T, S1, S2>(this IEnumerable<T> tt, Func<IEnumerable<T>, S1> f1, Func<IEnumerable<T>, S2> f2)
        {
            return (f1(tt), f2(tt));
        }

        public static (S1, S2) Both_NonStream<T, S1, S2>(this IEnumerable<T> tt, Func<IEnumerable<T>, S1> f1, Func<IEnumerable<T>, S2> f2)
        {
            var tt2 = tt.ToList();
            return (f1(tt2), f2(tt2));
        }

        public static (bool, int) Any5Sum(this IEnumerable<int> ii)
        {
            int sum = 0;
            bool any5 = false;
            foreach (int i in ii)
            {
                sum += i;
                any5 |= i > 5; // or: if (!any5) any5 = i > 5;
            }
            return (any5, sum);
        }

    }
    class Program
    {
        static void Main()
        {
            var mynums = Enumerable.Range(0, 10).Trace("mynums:");
            Console.WriteLine("TwoPass: (any > 5, sum) = {0}", mynums.Both_TwoPass(tt => tt.Any(k => k > 5), tt => tt.Sum()));
            Console.WriteLine("NonStream: (any > 5, sum) = {0}", mynums.Both_NonStream(tt => tt.Any(k => k > 5), tt => tt.Sum()));
            Console.WriteLine("Manual: (any > 5, sum) = {0}", mynums.Any5Sum());
        }
    }
}

Ответы [ 3 ]

4 голосов
/ 01 апреля 2020

Как вы написали свою вычислительную модель (т.е. return (f1(tt), f2(tt))), нет способа избежать нескольких итераций вашего перечислимого. Вы в основном говорите, что вычислите Item1, а затем вычислите Item2.

Вам нужно либо изменить модель с (Func<IEnumerable<T>, S1>, Func<IEnumerable<T>, S2>) на (Func<T, S1>, Func<T, S2>), либо на Func<IEnumerable<T>, (S1, S2)>, чтобы иметь возможность выполнять вычисления параллельно .

Ваша реализация Any5Sum - это в основном второй подход (Func<IEnumerable<T>, (S1, S2)>). Но для этого уже есть встроенный метод.

Попробуйте:

Console.WriteLine("Aggregate: (any > 5, sum) = {0}",
    mynums
        .Aggregate<int, (bool any5, int sum)>(
            (false, 0),
            (a, x) => (a.any5 | x > 5, a.sum + x)));
2 голосов
/ 03 апреля 2020

Я думаю, вы и I описывают одно и то же в комментариях. Тем не менее, нет необходимости создавать такой «специализированный IEnumerable», потому что BlockingCollection<> класс уже существует для такого сценария производитель-потребитель ios. Вы бы использовали его следующим образом ...

  • Создать BlockingCollection<> для каждой функции-потребителя (т.е. tt1 и tt2).
    • По умолчанию BlockingCollection<> оборачивает ConcurrentQueue<>, поэтому элементы поступят в порядке FIFO.
    • Чтобы удовлетворить ваше требование, чтобы удерживался только один элемент в памяти одновременно, 1 будет указано для ограниченной емкости . Обратите внимание, что эта емкость указана для каждой коллекции, поэтому в двух коллекциях в любой момент времени может быть до двух элементов в очереди.
    • Каждая коллекция будет содержать элементы ввода для этого потребителя.
  • Создать поток / задачу для каждой потребляющей функции.
    • Поток / задача просто вызовет GetConsumingEnumerator() для своей коллекции входных данных, передаст полученный IEnumerable<> своей функции-потребителю и вернет этот результат.
      • GetConsumingEnumerable() делает то, что следует из его названия: он создает IEnumerable<>, который потребляет (удаляет) элементы из коллекции. Если коллекция пуста, перечисление будет блокироваться, пока элемент не будет добавлен. CompleteAdding() вызывается после завершения работы производителя, что позволяет счетчику-потребителю завершать работу при опустошении коллекции.
  • Производитель перечисляет IEnumerable<>, tt и добавляет каждый элемент в обе коллекции. Это единственный раз, когда tt перечисляется.
    • BlockingCollection<>.Add() будет заблокирован, если коллекция достигнет своей емкости, предотвращая буферизацию всего tt в памяти.
  • После полного перечисления tt для каждой коллекции вызывается CompleteAdding().
  • После завершения каждого потока / задачи потребителя возвращаются их результаты.

Вот что это выглядит как в коде ...

public static (S1, S2) Both<T, S1, S2>(this IEnumerable<T> tt, Func<IEnumerable<T>, S1> tt1, Func<IEnumerable<T>, S2> tt2)
{
    const int MaxQueuedElementsPerCollection = 1;

    using (BlockingCollection<T> collection1 = new BlockingCollection<T>(MaxQueuedElementsPerCollection))
    using (Task<S1> task1 = StartConsumerTask(collection1, tt1))
    using (BlockingCollection<T> collection2 = new BlockingCollection<T>(MaxQueuedElementsPerCollection))
    using (Task<S2> task2 = StartConsumerTask(collection2, tt2))
    {
        foreach (T element in tt)
        {
            collection1.Add(element);
            collection2.Add(element);
        }

        // Inform any enumerators created by .GetConsumingEnumerable()
        // that there will be no more elements added.
        collection1.CompleteAdding();
        collection2.CompleteAdding();

        // Accessing the Result property blocks until the Task<> is complete.
        return (task1.Result, task2.Result);
    }

    Task<S> StartConsumerTask<S>(BlockingCollection<T> collection, Func<IEnumerable<T>, S> func)
    {
        return Task.Run(() => func(collection.GetConsumingEnumerable()));
    }
}

Обратите внимание, что ради эффективности вы можете увеличить MaxQueuedElementsPerCollection, скажем, до 10 или 100, чтобы потребители не должны были работать в режиме блокировки друг с другом.

Существует одна проблема с этим кодом . Когда коллекция пуста, потребитель должен ждать, пока производитель произведет элемент, а когда коллекция заполнена, производитель должен ждать, пока потребитель потребит элемент. Подумайте, что происходит на полпути при выполнении вашей tt => tt.Any(k => k > 5) лямбды ...

  1. Производитель ожидает, что коллекция не будет заполнена, и добавляет 5.
  2. Потребитель ждет, пока коллекция не будет пустой, и удаляет 5.
    • 5 > 5 возвращает false и перечисление продолжается.
  3. Производитель ожидает, что коллекция не будет заполнена, и добавляет 6.
  4. Потребитель ожидает, пока коллекция не будет пустой, и удаляет 6.
    • 6 > 5 возвращает true и перечисление останавливается. Any(), лямбда и потребительская задача возвращаются.
  5. Производитель ожидает, что коллекция не будет заполнена, и добавляет 7.
  6. продюсер ожидает, что коллекция не будет полной и ... этого никогда не случится!
    • Потребитель уже отказался от перечисления, поэтому он не потребляет никаких элементов, чтобы освободить место для нового. Add() никогда не вернется.

Самый простой способ, которым я мог бы придумать, чтобы предотвратить этот тупик, - это обеспечить перечисление всей коллекции, даже если func не Сделай так. Это просто требует простого изменения StartConsumerTask<>() локального метода ...

Task<S> StartConsumerTask<S>(BlockingCollection<T> collection, Func<IEnumerable<T>, S> func)
{
    return Task.Run(
        () => {
            try
            {
                return func(collection.GetConsumingEnumerable());
            }
            finally
            {
                // Prevent BlockingCollection<>.Add() calls from
                // deadlocking by ensuring the entire collection gets
                // consumed even if func abandoned its enumeration early.
                foreach (T element in collection.GetConsumingEnumerable())
                {
                    // Do nothing...
                }
            }
        }
    );
}

Недостатком этого является то, что tt всегда будет перечисляться до завершения, даже если оба tt1 и tt2 рано покидают свои счетчики.

С учетом этого, это ...

static void Main()
{
    IEnumerable<int> mynums = Enumerable.Range(0, 10).Trace("mynums:");

    Console.WriteLine("Both: (any > 5, sum) = {0}", mynums.Both(tt => tt.Any(k => k > 5), tt => tt.Sum()));
}

... выводит это ...

mynums: 0 1 2 3 4 5 6 7 8 9.
Both: (any > 5, sum) = (True, 45)
0 голосов
/ 03 апреля 2020

Основная проблема заключается в том, кто отвечает за вызов Enumeration.MoveNext() (например, с помощью foreach l oop). Синхронизация нескольких циклов foreach между потоками будет медленной и непростой задачей.

Реализация IAsyncEnumerable<T>, чтобы несколько циклов await foreach могли по очереди обрабатывать элементы. Но все равно глупо.

Так что более простым решением было бы изменить вопрос. Вместо того, чтобы пытаться вызывать несколько методов, которые оба пытаются перечислить элементы, измените интерфейс, чтобы просто посетить каждый элемент.

...