Многопоточность в цикле foreach? - PullRequest
4 голосов
/ 06 июля 2010

Я хочу обработать некоторые данные.У меня есть около 25 тыс. Элементов в словаре.В цикле foreach я запрашиваю базу данных, чтобы получить результаты по этому элементу.Они добавлены в словарь в качестве значения.

foreach (KeyValuePair<string, Type> pair in allPeople)
{
    MySqlCommand comd = new MySqlCommand("SELECT * FROM `logs` WHERE IP = '" + pair.Key + "' GROUP BY src", con);
    MySqlDataReader reader2 = comd.ExecuteReader();
    Dictionary<string, Dictionary<int, Log>> allViews = new Dictionary<string, Dictionary<int, Log>>();
    while (reader2.Read())
    {
        if (!allViews.ContainsKey(reader2.GetString("src")))
        {
            allViews.Add(reader2.GetString("src"), reader2.GetInt32("time"));
        }
    }
    reader2.Close();
    reader2.Dispose();
    allPeople[pair.Key].View = allViews;
}

Я надеялся, что смогу сделать это быстрее благодаря многопоточности.У меня доступно 8 потоков, и загрузка процессора составляет около 13%.Я просто не знаю, будет ли это работать, потому что он полагается на сервер MySQL.С другой стороны, может быть, 8 потоков откроют 8 соединений с БД, и, следовательно, будут быстрее.

В любом случае, если многопоточность поможет в моем случае, как?oO Я никогда не работал с (несколькими) потоками, поэтому любая помощь была бы полезна: D

Ответы [ 7 ]

5 голосов
/ 06 июля 2010

MySqlDataReader с состоянием - вы вызываете Read() для него, и он переходит к следующей строке, поэтому каждому потоку нужен свой собственный читатель, и вам нужно составить запрос, чтобы они получили разные значения.Это может быть не так сложно, так как у вас, естественно, много запросов с разными значениями pair.Key.

Вам также нужно либо иметь временный словарь для каждого потока, а затем объединить их, либо использовать блокировку для предотвращенияодновременное изменение словаря.

Выше предполагается, что MySQL позволит одному соединению выполнять параллельные запросы;в противном случае вам также может понадобиться несколько соединений.

Во-первых, я посмотрю, что произойдет, если вы спросите базу данных только о необходимых данных ("SELECT src,time FROM logs WHERE IP = '" + pair.Key + "' GROUP BY src") и используете GetString (0) иGetInt32 (1) вместо использования имен для поиска src и времени;также получить значения только один раз из результата.

Я также не уверен в логике - вы не упорядочиваете события журнала по времени, поэтому, какое из них возвращается первым (и поэтому хранится всловарь) может быть любым из них.

Примерно такая логика - где каждый из потоков N работает только на N -ой паре, каждый поток имеет своего собственного читателяи ничто на самом деле не меняет allPeople, только свойства значений в allPeople:

    private void RunSubQuery(Dictionary<string, Type> allPeople, MySqlConnection con, int threadNumber, int threadCount)
    {
        int hoppity = 0; // used to hop over the keys not processed by this thread

        foreach (var pair in allPeople)
        {
            // each of the (threadCount) threads only processes the (threadCount)th key
            if ((hoppity % threadCount) == threadNumber)
            {
                // you may need con per thread, or it might be that you can share con; I don't know
                MySqlCommand comd = new MySqlCommand("SELECT src,time FROM `logs` WHERE IP = '" + pair.Key + "' GROUP BY src", con);

                using (MySqlDataReader reader = comd.ExecuteReader())
                {
                    var allViews = new Dictionary<string, Dictionary<int, Log>>();

                    while (reader.Read())
                    {
                        string src = reader.GetString(0);
                        int time = reader.GetInt32(1);

                        // do whatever to allViews with src and time
                    }

                    // no thread will be modifying the same pair.Value, so this is safe
                    pair.Value.View = allViews;
                }
            }

            ++hoppity;
        }
    }

Это не проверено - у меня нет MySQL на этом компьютере, и при этом у вас нет вашегобаза данных и другие типы, которые вы используете.Это также довольно процедурно (вроде того, как вы бы это делали в Fortran с OpenMPI) вместо того, чтобы оборачивать все в объекты задач.

Вы можете запускать потоки для этого следующим образом:

    void RunQuery(Dictionary<string, Type> allPeople, MySqlConnection connection)
    {
        lock (allPeople)
        {
            const int threadCount = 8; // the number of threads

            // if it takes 18 seconds currently and you're not at .net 4 yet, then you may as well create
            // the threads here as any saving of using a pool will not matter against 18 seconds
            //
            // it could be more efficient to use a pool so that each thread takes a pair off of 
            // a queue, as doing it this way means that each thread has the same number of pairs to process,
            // and some pairs might take longer than others
            Thread[] threads = new Thread[threadCount];

            for (int threadNumber = 0; threadNumber < threadCount; ++threadNumber)
            {
                threads[threadNumber] = new Thread(new ThreadStart(() => RunSubQuery(allPeople, connection, threadNumber, threadCount)));
                threads[threadNumber].Start();
            }

            // wait for all threads to finish
            for (int threadNumber = 0; threadNumber < threadCount; ++threadNumber)
            {
                threads[threadNumber].Join();
            }
        }
    }

Дополнительная блокировка, удерживаемая в allPeople, выполняется таким образом, что после возврата всех потоков возникает барьер записи;Я не совсем уверен, если это необходимо.Подойдет любой объект.

Ничто из этого не гарантирует какого-либо увеличения производительности - возможно, библиотеки MySQL являются однопоточными, но сервер, безусловно, может обрабатывать несколько соединений.Измеряйте с различным числом потоков.


Если вы используете .net 4, вам не нужно возиться с созданием потоков или пропуском элементов, с которыми вы не работаете:

    // this time using .net 4 parallel; assumes that connection is thread safe
    static void RunQuery(Dictionary<string, Type> allPeople, MySqlConnection connection)
    {
        Parallel.ForEach(allPeople, pair => RunPairQuery(pair, connection));
    }

    private static void RunPairQuery(KeyValuePair<string, Type> pair, MySqlConnection connection)
    {
        MySqlCommand comd = new MySqlCommand("SELECT src,time FROM `logs` WHERE IP = '" + pair.Key + "' GROUP BY src", connection);

        using (MySqlDataReader reader = comd.ExecuteReader())
        {
            var allViews = new Dictionary<string, Dictionary<int, Log>>();

            while (reader.Read())
            {
                string src = reader.GetString(0);
                int time = reader.GetInt32(1);

                // do whatever to allViews with src and time
            }

            // no iteration will be modifying the same pair.Value, so this is safe
            pair.Value.View = allViews;
        }
    }
3 голосов
/ 06 июля 2010

Самая большая проблема, которая приходит на ум, заключается в том, что вы собираетесь использовать многопоточность для добавления значений в словарь, который не является поточно-ориентированным.

Вам нужно будет что-то сделать , например , чтобы это сработало, и вы, возможно, не получите большую выгоду от его реализации, поскольку это все еще требует блокировки объекта словаря для добавления значение.

1 голос
/ 06 июля 2010

Допущения:

  1. Есть столик Люди в вашем база данных
  2. Есть много людей в ваша база данных

Каждый запрос к базе данных добавляет накладных расходов: вы выполняете один запрос БД для каждого человека в вашей базе данных. Я бы посоветовал быстрее вернуть все данные в одном запросе, чем делать повторные вызовы.

select l.ip,l.time,l.src 
  from logs l, people p 
  where l.ip = p.ip
  group by l.ip, l.src

Попробуйте это с циклом в одном потоке, я верю, что это будет намного быстрее, чем ваш существующий код.

Имея в своем существующем коде еще одну вещь, которую вы можете сделать, это вывести создание MySqlCommand из цикла, подготовить его заранее и просто изменить параметр. Это должно ускорить выполнение SQL. см http://dev.mysql.com/doc/refman/5.0/es/connector-net-examples-mysqlcommand.html#connector-net-examples-mysqlcommand-prepare

MySqlCommand comd = new MySqlCommand("SELECT * FROM `logs` WHERE IP = ?key GROUP BY src", con);
comd.prepare();
comd.Parameters.Add("?key","example");
foreach (KeyValuePair<string, Type> pair in allPeople)
{
    comd.Parameters[0].Value = pair.Key;

Если вы используете несколько потоков, каждому потоку по-прежнему будет нужна собственная команда, хотя бы в MS-SQL это все равно было бы быстрее, даже если вы каждый раз заново создаете и подготавливаете статистику благодаря способности сервера SQL быть в состоянии кэшировать план выполнения парамертированного отчета.

1 голос
/ 06 июля 2010

Прежде чем делать что-либо еще, выясните, где именно проводится время.Проверьте план выполнения запроса.Первое, что я подозреваю, это отсутствие индекса в logs.IP.

18 минут для чего-то подобного мне кажется слишком долгим.Даже если вы можете сократить время выполнения в восемь, добавив больше потоков (что маловероятно!), Вы все равно будете использовать более 2 минут.Вероятно, вы могли бы прочитать целые 25 тыс. Строк в память менее чем за пять секунд и выполнить необходимую обработку в памяти ...

РЕДАКТИРОВАТЬ: Просто чтобы уточнить, я не защищаю на самом деле делать это в памяти, просто говоряпохоже, что здесь есть более узкое место, которое можно устранить.

0 голосов
/ 06 июля 2010

Это звучит как идеальная работа для карт / уменьшения, я не .Net-программист, но это кажется разумным руководством: http://ox.no/posts/minimalistic-mapreduce-in-net-4-0-with-the-new-task-parallel-library-tpl

0 голосов
/ 06 июля 2010

Спасибо всем за вашу помощь.В настоящее время я использую этот

for (int i = 0; i < 8; i++)
{
    ThreadPool.QueueUserWorkItem(addDistinctScres, i);
}

ThreadPool для запуска всех потоков.Я использую метод, предоставленный Питом Киркхэмом, и создаю новое соединение для каждого потока.Время сократилось до 4 минут.

Далее я заставлю что-нибудь подождать обратный вызов пула потоков?перед выполнением других функций.

Я думаю, что теперь узким местом является сервер MySQL, поскольку загрузка ЦП снижается.

@ странный паритет Я думал об этом, но реальная вещь - это больше, чем просто25 тыс строкИДК, если это сработает.

0 голосов
/ 06 июля 2010

Я думаю, что если вы используете это на многоядерной машине, вы могли бы получить выгоду от многопоточности.

Однако, как я бы подошел, сначала взгляните на разблокирование потока, который вы используете в данный момент, выполнив асинхронные вызовы базы данных.Обратные вызовы будут выполняться в фоновых потоках, так что вы получите многоядерное преимущество и не будете блокировать потоки, ожидающие возвращения БД.

Для приложений с интенсивным вводом-выводом, как в этом примере, звучит так, как вывероятно, увидят улучшенную пропускную способность в зависимости от того, с какой нагрузкой может справиться БД.Предполагая, что дб масштабирует для обработки более одного одновременного запроса, вы должны быть хороши.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...