C # добавить в очередь параллельно и прослушивать распространение очереди - PullRequest
0 голосов
/ 04 марта 2019

Я новичок в многопоточном программировании.У меня есть программа, которая должна запросить базу данных, а затем выполнить некоторые манипуляции с данными возвращенных данных.Из-за структуры моей организации мне нужно сделать отдельный вызов в базу данных, чтобы получить информацию об учетной записи одного пользователя.Моя задача заключается в сборе данных по тысячам учетных записей.

В настоящее время я использую Parallel.ForEach () для запроса базы данных и добавления всех элементов в ConcurrentList.Как только все данные были возвращены из базы данных, я затем выполняю свои манипуляции синхронно.

Помимо любых вопиющих проблем, единственное, что мне не нравится, - это хранить большой список в памяти и по существу блокироваться додлительный процесс базы данных закончен.Я хотел бы иметь возможность помещать данные в очередь, а затем начать обработку данных, как только данные будут добавлены.Процесс потребления не должен быть параллельным или асинхронным.Мне просто нужно, чтобы он мог прослушивать, когда что-то добавляется в очередь или что очередь не пуста.

Параллельный процесс:

public static ConcurrentBag<CombinedAccountInfo> GetAllAccountInfo(List<AccountInfo> accountList, string dbConnName)
    {
        logger.Info("Fetching Data");
        var concurrentCombinedData = new ConcurrentBag<CombinedAccountInfo>();
        Parallel.ForEach(accountList, new ParallelOptions { MaxDegreeOfParallelism = 5 }, r =>
        {
            try
            {
                var userPrefs = new List<UserPreference>().queryData(Queries.UserPrefQuery, dbConnName);

                concurrentCombinedData.Add(new CombinedAccountInfo()
                {
                    AccountName = r.AccountName,
                    AccountId = r.AccountId,
                    LastLoginDate = r.LastLoginDate,
                    AccountHandle = r.AccountHandle,
                    UserPreferences = userPrefs 
                });
            }
            catch (Exception e)
            {
                logger.Error(e);
            }
        });

        return concurrentCombinedTransaction;
    }

Iнемного прочитал о потоке данных и увидел несколько статей о Reactive Extensions.Однако я могу найти более простые примеры того, как несколько производителей объединяются в одного потребителя.Будем весьма благодарны за любые предложения или идеи, как лучше достичь конечной цели.

решено

Я буду использовать ответ, который Скотт Ханнен предоставлена.Поскольку манипулирование является небольшим и не очень интенсивным, каждый процесс может его обработать, а не пытаться связать все обратно в список.

Ответы [ 2 ]

0 голосов
/ 04 марта 2019

Хотя я действительно считаю, что вам следует запрашивать все пользовательские настройки одновременно, поскольку это повысит производительность вашей базы данных (на самом деле BIG TIME ), если вы хотите что-то вроде этого:

public void Answer<T>(List<Guid> ids)
{
    var stack = new ConcurrentStack<T>();

    Parallel.ForEach(ids, (id) =>
    {
        T value = GetData<T>(id);

        stack.Push(value);
    });

    Parallel.For(0, ids.Count, (i) =>
    {
        T item;
        while (!stack.TryPop(out item))
        {
            // sleep
        }
        Process(item);
    });
}

Но я уже упоминал, я думаю, вы не должны идти туда?

0 голосов
/ 04 марта 2019

Если вы хотите работать с каждой учетной записью, когда вы извлекаете ее из базы данных, тогда вы можете сделать именно это вместо добавления элементов в ConcurrentBag<CombinedAccountInfo>.

public static ConcurrentBag<CombinedAccountInfo> GetAllAccountInfo(
    List<AccountInfo> accountList, 
    string dbConnName,
    Action<CombinedAccountInfo> doSomethingWithTheAccountInfo)

Затем, когда вы получитекаждый элемент из базы данных,

doSomethingWithTheAccountInfo(accountInfo);
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...