Как эффективно выполнять сложную обработку записей, прочитанных из базы данных - PullRequest
0 голосов
/ 14 мая 2019

честное предупреждение: это вопрос подхода и, по крайней мере, хорошей практики ... Проблема здесь не в синтаксисе, а в подходе.

Мне приходится обрабатывать огромное количество записей оченьбыстро и предоставить преобразованный набор записей для потребителя.Мне интересно, есть ли у кого-нибудь практические предложения по наиболее эффективному способу сделать это.

Вот сценарий:

Мне нужно выполнить относительно простой набор логики: Подключение к БД -> Чтение записей -> Преобразование каждой записи -> Предоставление выходных записей для потребителя

Эта логика должна быть доступна из библиотеки - с внутренней логикой, полностью скрытой от потребителя.(Потребитель не знает, что происходит какое-то преобразование - он думает, что он просто перебирает кучу объектов).

Обычно я бы создал класс IEnumerable с помощью метода, подобного этому:

public class TransformingReader<T> where T:class,new()
{
...
...
...

 public IEnumerator<T> GetEnumerator()
 {
      var items = _connection<dynamic>.GetData();
      foreach (var item in items)
      {
          T transformed = _complexTask.Transform(item);
          yield return transformed;
      }
 }
}

(использование динамического класса здесь только для иллюстрации)

Используя приведенный выше класс, потребитель:

foreach(var item in new TransformingReader<TransactionAnalysis>())
{
    ...
    DoStuff(item);
    ...
}

Факты:

  1. Я обрабатываю миллионы записей в день, поэтому объем является большой проблемой.

  2. Функция DoStuff () пользователей может занять некоторое время.У меня нет реального способа предсказать, насколько сложной будет их работа, но она должна быть более интенсивной, чем моя работа.

  3. Я работаю в относительно стесненных условиях - такне очень много памяти доступно и другие приложения находятся на той же машине.Итак, мне нужно вести себя ответственно.(Я не работаю на ноутбуке дедушки - но мне все еще нужно написать разумный код, который не жадный)

Мысли:

  1. Я хочу попробовать распараллелить функцию Transform (), чтобы я мог использовать время, в течение которого DoStuff () занят, для преобразования следующих записей.Таким образом, надеюсь, я всегда (часто?) Буду готов к новой записи к тому времени, когда пользователь запросит следующую.

  2. Я бы хотел сохранитьпростой foreach-синтаксис на стороне потребителей.Потребителю не нужно знать, что я усердно работаю за кулисами.

Любые идеи о том, как решить этот тип проблемы, будут по достоинству оценены.В частности, может быть, есть шаблон, о котором я не знаю, который мог бы помочь решить эту проблему?

Ответы [ 2 ]

1 голос
/ 14 мая 2019

Да, это шаблон производитель-потребитель.

См. Трубопроводы , как вы можете это реализовать.

var records = new BlockingCollection<SomeRecord>();
var outputs = new BlockingCollection<SomeResult>();

var readRecords = Task.Run(async () =>
{
    using (var conn = new SqlConnection("..."))
    {
        conn.Open();
        using (var cmd = conn.CreateCommand())
        using (var reader = cmd.ExecuteReader())
        {
            while (reader.Read())
            {
                var record = new SomeRecord { Prop = reader.GetValue(0) };
                records.Add(record);
            }
        }
    }
});

var transformRecords = Task.Run(() =>
{
    foreach (var record in records.GetConsumingEnumerable())
    {
        // transform record
        outputs.Add(new SomeResult());
    }
});

var consumeResults = Task.Run(() =>
{
    foreach (var result in outputs.GetConsumingEnumerable())
    {
        // ...
    }
});

Task.WaitAll(readRecords, transformRecords, consumeResults);

При необходимости количество этапов конвейера можно легко увеличить, добавив новые задачи.

Преобразование легко распараллелить:

records.GetConsumingEnumerable()
       .AsParallel()
       .AsOrdered() // if you want to keep order

Если одна из задач намного быстрее других и забивает память, вы можете ограничить емкость ее сбора:

var records = new BlockingCollection<SomeRecord>(boundedCapacity: 50);
1 голос
/ 14 мая 2019

Это звучит как Проблема производителя-потребителя .

Одним из решений было бы создание потока для извлечения и преобразования данных, поток производителя .Затем в каком-то другом потоке (может быть основным) вы запускаете потребитель , пользователи DoStuff(item).Будет очередь (скорее всего, параллельная очередь ), которая будет использоваться для связи между потоками.

С точки зрения пользователей, вы все равно можете предоставлять данные в качестве перечислителя, который будет читать изочередь, блокировка, когда очередь пуста, и конец, когда она читает некое заранее определенное значение, которое сигнализирует об окончании ввода (иногда называемое ядовитая таблетка ).

Объем памяти определяется размеромочередь, так что вы можете адаптировать ее к вашим потребностям.

Этот шаблон позволяет вам увеличить количество производителей и потребителей, чтобы вы могли Transform() несколько элементов одновременно и параллельно DoStuff() с несколькими предметами одновременно.

Из вашего описания можно решить вашу проблему с помощью одного оператора Parallel LINQ (который за кадром использует вариант решения, описанного выше).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...