C # Windows-сервис вложенный Parallel.ForEach - PullRequest
0 голосов
/ 20 марта 2019

Я разработал службу Windows, которая запрашивает 70 баз данных каждые 5 секунд. Для каждой записи в базе данных данные извлекаются и записываются в XML-файл в файловой системе. Целью этого сервиса является своевременная обработка данных каждой базы данных. Время от времени в таблице очередей создается более 9000 записей. На данный момент активировано только 6 баз данных.

Есть два раздела: В каждой базе данных есть один цикл foreach и один для таблицы очередей. Кроме того, количество обработанных записей ограничено 200, чтобы базы данных обрабатывались друг за другом.

В следующем фрагменте кода показан метод OnStart () службы Windows.

protected override void OnStart(string[] args)
{
    mainTask = new Task(PollQueue, cts.Token, TaskCreationOptions.None);
    mainTask.Start();
}

Из-за нехватки времени вызовы должны распараллеливаться. Внешний цикл, который перебирает базы данных, был изменен на Parallel.ForEach (). Это работало нормально для 6 баз данных, которые уже активны. После этого внутренний цикл, повторяющийся над таблицей очередей, также был изменен на Parallel.ForEach ().

Пока активна только одна база данных, это тоже работает. Но как только другой активируется, обработка останавливается. Ограничение числа потоков с помощью MaxDegreeOfParallelism не помогло. Без этого ограничения число потоков было около 200. Но была обработана только одна база данных.

Вот пример кода. Метод HandleQueueForEachLoop показывает первую реализацию, а метод HandleQueueForEachParallel - тест с Parallel.ForEach. Этот пример не является исполняемым, но показывает реализацию всех.

using Oracle.ManagedDataAccess.Client;
using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Configuration;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApp8
{
    class Program
    {
        static void Main(string[] args)
        {
            HandleQueueForEachLoop();
            HandleQueueForEachParallel();
        }

        public static void HandleQueueForEachLoop()
        {
            NameValueCollection branches = (NameValueCollection)ConfigurationManager.GetSection("Branches");
            foreach (var branch in branches.AllKeys)
            {
                using (OracleConnection conn = new OracleConnection(branch))
                {
                    conn.Open();
                    var queue = ReadQueue(QueueEntityDescriptor, conn, MaximumRows);
                    if (queue.Count > 0)
                    {
                        List< DefaultEntity> entitiesWithData = FetchQueueData(queue, conn);
                        foreach (var entity in entitiesWithData)
                        {
                            if (entity.Records.Count == 0)
                            {
                                UpdateQueue(entity.EntityQueueId, QueueEntityDescriptor, conn, EntityQueueErrorEnum.NoRecordsInSelect);
                            }
                            else
                            {
                                try
                                {
                                    List<XElement> entityXml = CreateXmlForEntity(entity);
                                    var path = XmlPath;
                                    if (XMLPathSubFolder)
                                    {
                                        path = path + entity.EntityName + "\\";
                                    }

                                    foreach (var item in entityXml)
                                    {
                                        WriteDataXml(item, path, ".work");
                                    }

                                    UpdateQueue(entity.EntityQueueId, EntityQueueErrorEnum.Handled);
                                }
                                catch (Exception ex)
                                {
                                    UpdateQueue(entity.EntityQueueId, EntityQueueErrorEnum.Exception);
                                }
                            }

                        }
                    }
                }
            }
        }
        public static void HandleQueueForEachParallel()
        {
            NameValueCollection branches = (NameValueCollection)ConfigurationManager.GetSection("Branches");
            Parallel.ForEach(branches.AllKeys, branch =>
            {
                using (OracleConnection conn = new OracleConnection(branch))
                {
                    conn.Open();
                    var queue = ReadQueue(QueueEntityDescriptor, conn, MaximumRows);
                    if (queue.Count > 0)
                    {
                        Parallel.ForEach(queue,
                        new ParallelOptions { MaxDegreeOfParallelism = Convert.ToInt32(Math.Ceiling((Environment.ProcessorCount * 0.75) * 2.0)) },
                        queueItem =>
                        {
                            DefaultEntity entity = FetchQueueEntryData(queueItem, conn);
                            if (entity.Records.Count == 0)
                            {
                                UpdateQueue(entity.EntityQueueId, EntityQueueErrorEnum.NoRecordsInSelect);
                            }
                            else
                            {
                                try
                                {
                                    var entityXml = CreateXmlForEntity(entity);
                                    var path = XmlPath;
                                    if (XMLPathSubFolder)
                                    {
                                        path = path + entity.EntityName + "\\";
                                    }

                                    foreach (var item in entityXml)
                                    {
                                        WriteData(item, path, ".work");
                                    }

                                    UpdateQueue(entity.EntityQueueId, EntityQueueErrorEnum.Handled);
                                }
                                catch (Exception ex)
                                {
                                    UpdateQueue(entity.EntityQueueId, EntityQueueErrorEnum.Exception);
                                }
                            }
                        });
                    }
                }
            });
        }
    }
}

Сервер имеет 4 ядра ЦП, которые практически не используются. Мы создаем только одно соединение для каждой базы данных с помощью пакета NuGet OracleManageDataAccess. XML-файлы пишутся с помощью XElement.Save ().

Я уже много читал о Parallel.ForEach () и искал несколько учебных пособий.

Мои вопросы:

  1. Разумно ли использовать вложенные циклы Parallel.ForEach?
  2. Существуют ли другие рекомендации для параллельной обработки нескольких баз данных?

Спасибо

...