Есть ли способ использовать Task Parallel Library (TPL) с SQLDataReader? - PullRequest
20 голосов
/ 23 июня 2010

Мне нравится простота методов расширения Parallel.For и Parallel.ForEach в TPL. Мне было интересно, есть ли способ воспользоваться чем-то похожим или даже немного более продвинутыми Задачами.

Ниже приведено типичное использование SqlDataReader, и мне было интересно, возможно ли это, и если да, то как заменить цикл while ниже чем-то в TPL. Поскольку читатель не может предоставить фиксированное количество итераций, метод For extension не возможен, что оставляет дело с Задачами, которые я собираюсь собрать. Я надеялся, что кто-то, возможно, уже занялся этим и решил что-то делать и чего не делать с ADO.net.

using (SqlConnection conn = new SqlConnection("myConnString"))
using (SqlCommand comm = new SqlCommand("myQuery", conn))
{
    conn.Open();

    SqlDataReader reader = comm.ExecuteReader();

    if (reader.HasRows)
    {
        while (reader.Read())
        {
            // Do something with Reader
        }
    }
}

Ответы [ 2 ]

25 голосов
/ 23 июня 2010

Вам будет трудно заменить этот цикл while напрямую. SqlDataReader не Потокобезопасный класс, поэтому вы не можете использовать его напрямую из нескольких потоков.

При этом вы можете обработать данные, которые вы прочитали, используя TPL. Здесь есть несколько вариантов. Самым простым может быть создание собственной реализации IEnumerable<T>, которая работает с читателем и возвращает класс или структуру, содержащую ваши данные. Затем вы можете использовать PLINQ или оператор Parallel.ForEach для параллельной обработки ваших данных:

public IEnumerable<MyDataClass> ReadData()
{
    using (SqlConnection conn = new SqlConnection("myConnString"))
    using (SqlCommand comm = new SqlCommand("myQuery", conn))
    {
        conn.Open();

        SqlDataReader reader = comm.ExecuteReader();

        if (reader.HasRows)
        {
            while (reader.Read())
            {
                yield return new MyDataClass(... data from reader ...);
            }
        }
    }
}

Если у вас есть этот метод, вы можете обработать его напрямую, через PLINQ или TPL:

Parallel.ForEach(this.ReadData(), data =>
{
    // Use the data here...
});

Или:

this.ReadData().AsParallel().ForAll(data => 
{
    // Use the data here...
});
19 голосов
/ 23 июня 2010

Ты почти у цели.Оберните код, отправленный вами в функцию, с такой подписью:

IEnumerable<IDataRecord> MyQuery()

, а затем замените код // Do something with Reader следующим:

yield return reader;

Теперь у вас есть что-то, что работает в одномнить.К сожалению, когда вы читаете результаты запроса, он каждый раз возвращает ссылку на один и тот же объект, и объект просто мутирует сам для каждой итерации.Это означает, что если вы попытаетесь запустить его параллельно, вы получите действительно странные результаты, поскольку параллельное чтение изменяет объект, используемый в разных потоках.Вам нужен код, чтобы взять копию записи для отправки в параллельный цикл.

Однако в этот момент мне нравится пропускать дополнительную копию записи и переходить прямо к строго типизированному классу.Более того, мне нравится использовать универсальный метод для этого:

IEnumerable<T> GetData<T>(Func<IDataRecord, T> factory, string sql, Action<SqlParameterCollection> addParameters)
{
    using (var cn = new SqlConnection("My connection string"))
    using (var cmd = new SqlCommand(sql, cn))
    {
        addParameters(cmd.Parameters);

        cn.Open();
        using (var rdr = cmd.ExecuteReader())
        {
            while (rdr.Read())
            {
                yield return factory(rdr);
            }
        }
    }
}

Предполагая, что ваши фабричные методы создают копию, как и ожидалось, этот код должен быть безопасным для использования в цикле Parallel.ForEach.Вызов метода будет выглядеть примерно так (при условии, что класс Employee со статическим фабричным методом с именем «Create»):

var UnderPaid = GetData<Employee>(Employee.Create, 
       "SELECT * FROM Employee WHERE AnnualSalary <= @MinSalary", 
       p => {
           p.Add("@MinSalary", SqlDbType.Int).Value = 50000;
       });
Parallel.ForEach(UnderPaid, e => e.GiveRaise());

Важное обновление:
Я нетак же уверен в этом коде, как я когда-то был.Отдельный поток все еще может видоизменить читатель, в то время как другой поток находится в процессе создания его копии.Я мог бы заблокировать это, но я также обеспокоен тем, что другой поток может вызвать обновление читателя после того, как сам оригинал вызвал Read (), но до того, как он начнет делать копию.Таким образом, критический раздел здесь состоит из всего цикла while ... и на этом этапе вы снова возвращаетесь к однопоточному.Я ожидаю, что есть способ изменить этот код, чтобы он работал как ожидается для многопоточных сценариев, но это потребует дополнительного изучения.

...