MySqlDataReader
с состоянием - вы вызываете Read()
для него, и он переходит к следующей строке, поэтому каждому потоку нужен свой собственный читатель, и вам нужно составить запрос, чтобы они получили разные значения.Это может быть не так сложно, так как у вас, естественно, много запросов с разными значениями pair.Key.
Вам также нужно либо иметь временный словарь для каждого потока, а затем объединить их, либо использовать блокировку для предотвращенияодновременное изменение словаря.
Выше предполагается, что MySQL позволит одному соединению выполнять параллельные запросы;в противном случае вам также может понадобиться несколько соединений.
Во-первых, я посмотрю, что произойдет, если вы спросите базу данных только о необходимых данных ("SELECT src,time FROM
logs WHERE IP = '" + pair.Key + "' GROUP BY src"
) и используете GetString (0) иGetInt32 (1) вместо использования имен для поиска src и времени;также получить значения только один раз из результата.
Я также не уверен в логике - вы не упорядочиваете события журнала по времени, поэтому, какое из них возвращается первым (и поэтому хранится всловарь) может быть любым из них.
Примерно такая логика - где каждый из потоков N работает только на N -ой паре, каждый поток имеет своего собственного читателяи ничто на самом деле не меняет allPeople
, только свойства значений в allPeople
:
private void RunSubQuery(Dictionary<string, Type> allPeople, MySqlConnection con, int threadNumber, int threadCount)
{
int hoppity = 0; // used to hop over the keys not processed by this thread
foreach (var pair in allPeople)
{
// each of the (threadCount) threads only processes the (threadCount)th key
if ((hoppity % threadCount) == threadNumber)
{
// you may need con per thread, or it might be that you can share con; I don't know
MySqlCommand comd = new MySqlCommand("SELECT src,time FROM `logs` WHERE IP = '" + pair.Key + "' GROUP BY src", con);
using (MySqlDataReader reader = comd.ExecuteReader())
{
var allViews = new Dictionary<string, Dictionary<int, Log>>();
while (reader.Read())
{
string src = reader.GetString(0);
int time = reader.GetInt32(1);
// do whatever to allViews with src and time
}
// no thread will be modifying the same pair.Value, so this is safe
pair.Value.View = allViews;
}
}
++hoppity;
}
}
Это не проверено - у меня нет MySQL на этом компьютере, и при этом у вас нет вашегобаза данных и другие типы, которые вы используете.Это также довольно процедурно (вроде того, как вы бы это делали в Fortran с OpenMPI) вместо того, чтобы оборачивать все в объекты задач.
Вы можете запускать потоки для этого следующим образом:
void RunQuery(Dictionary<string, Type> allPeople, MySqlConnection connection)
{
lock (allPeople)
{
const int threadCount = 8; // the number of threads
// if it takes 18 seconds currently and you're not at .net 4 yet, then you may as well create
// the threads here as any saving of using a pool will not matter against 18 seconds
//
// it could be more efficient to use a pool so that each thread takes a pair off of
// a queue, as doing it this way means that each thread has the same number of pairs to process,
// and some pairs might take longer than others
Thread[] threads = new Thread[threadCount];
for (int threadNumber = 0; threadNumber < threadCount; ++threadNumber)
{
threads[threadNumber] = new Thread(new ThreadStart(() => RunSubQuery(allPeople, connection, threadNumber, threadCount)));
threads[threadNumber].Start();
}
// wait for all threads to finish
for (int threadNumber = 0; threadNumber < threadCount; ++threadNumber)
{
threads[threadNumber].Join();
}
}
}
Дополнительная блокировка, удерживаемая в allPeople, выполняется таким образом, что после возврата всех потоков возникает барьер записи;Я не совсем уверен, если это необходимо.Подойдет любой объект.
Ничто из этого не гарантирует какого-либо увеличения производительности - возможно, библиотеки MySQL являются однопоточными, но сервер, безусловно, может обрабатывать несколько соединений.Измеряйте с различным числом потоков.
Если вы используете .net 4, вам не нужно возиться с созданием потоков или пропуском элементов, с которыми вы не работаете:
// this time using .net 4 parallel; assumes that connection is thread safe
static void RunQuery(Dictionary<string, Type> allPeople, MySqlConnection connection)
{
Parallel.ForEach(allPeople, pair => RunPairQuery(pair, connection));
}
private static void RunPairQuery(KeyValuePair<string, Type> pair, MySqlConnection connection)
{
MySqlCommand comd = new MySqlCommand("SELECT src,time FROM `logs` WHERE IP = '" + pair.Key + "' GROUP BY src", connection);
using (MySqlDataReader reader = comd.ExecuteReader())
{
var allViews = new Dictionary<string, Dictionary<int, Log>>();
while (reader.Read())
{
string src = reader.GetString(0);
int time = reader.GetInt32(1);
// do whatever to allViews with src and time
}
// no iteration will be modifying the same pair.Value, so this is safe
pair.Value.View = allViews;
}
}