Parallel.Foreach с OledbDataReader для вызова веб-API вызывает дублирование строк - PullRequest
0 голосов
/ 01 мая 2018

У меня есть таблица БД с 7 столбцами и 10 строк. В каждой строке предоставляется входной параметр для вызова веб-API, а ответ, возвращаемый API-интерфейсом, вставляется в таблицу. Моя проблема в том, что Parallel.Foreach не дает тот же результат, что и обычный ForEach.

В частности, если в 1-й строке указан адрес «123 Jump Street Arizona Us», я получаю ответ от веб-интерфейса API со стандартным адресом «123 Jump Street Arizona USA», например, у меня есть 10 разных строк с 10 разными входами. адрес. Однако выходной ответ, полученный от Parallel.Foreach, повторяется для одного и того же адреса 5 раз. И в следующий раз, когда я его запущу, это совсем другой результат

Может ли кто-нибудь указать, почему это происходит, и потенциальное решение?

Вот мой код:

public void Main()
{
    // TODO: Add your code here

    string query = "SELECT  ADDR_LINE_ONE,ADDR_LINE_TWO,ADDR_LINE_THREE,COUNTRY,PROVINCE,CITY_NAME,POSTAL_CODE FROM Addresstestpoc";

    try
    {
        using (OleDbConnection connection = new OleDbConnection(conn))
        {
            OleDbCommand command = new OleDbCommand(query, connection);

            connection.Open();
            OleDbDataReader reader = command.ExecuteReader();
            int i = reader.FieldCount;
            bool b = reader.HasRows;
            Parallel.ForEach(GetFromReader(reader), record =>
                {                                                                      
                    //AddrOne = record[0].ToString();                                                            
                    string AddrOne = record.GetString(0);
                    string AddrTwo = record.GetString(1);
                    string AddrThree = record.GetString(2);                                                                                  
                    string Country = record.GetString(3);                                                          
                    string Province = record.GetString(4);                                                                                  
                    string City = record.GetString(5);                                                                                  
                    string PostalCode = record.GetString(6);                                                                                 
                    string Sender = "G";                                                                               
                    //sqlk = (string)Dts.Variables["User::sqlconn"].Value;                                                                               
                    standardizeAddressReturn result;                                                                                
                    string data = string.Empty;                                                                                
                    string queri;


                    MDMStandardizeAddressService web = new MDMStandardizeAddressService();

                    try
                    {                                                                                   
                        result = web.standardizeAddress(AddrOne, AddrTwo, AddrThree, City, Province, PostalCode, Country, Sender);

                        data = SerializeToXml(result);                                                                                    
                        queri = "insert into [CPM].[dbo].[AddressResponsetest_new] values ('" + data + "')";                                                             
                        //MessageBox.Show(data);                                                         
                        insertintosql(queri);                                                                                
                    }                                                    
                    catch (Exception ex)
                    {                                                                            
                        MessageBox.Show(ex.Message);
                    }
               });

        }
    }
    catch (Exception ex)
    {
        string msg = ex.Message;
    }

}

IEnumerable<IDataRecord> GetFromReader(IDataReader reader)
{
    while (reader.Read()) yield return reader;
}

1 Ответ

0 голосов
/ 01 мая 2018

Я думаю, вам нужен другой подход. Command и DataReader не являются потокобезопасными. Даже если DataReader был поточно-ориентированным, это курсор только вперед, поэтому он не будет быстрее.

Я рекомендую шаблон потребителя производителя (например, BlockingCollection). В потребителя, где вы можете параллельно обрабатывать

MDMStandardizeAddressService web = new MDMStandardizeAddressService();
try
{

Возможно, вы могли бы использовать задачи, ждать, асинхронно для потребителя производителя.

Более простой подход может состоять в том, чтобы создать класс для свойств и поместить его в список, а затем просто прочитать этот список. Это произойдет, если доли секунды.

Затем вы можете параллельно обрабатывать список.

Код заглушки:

public class WebMailer
{ 
    public void process()
    {
        List<Addr> Addrs = new List<Addr>();
        SqlCommand command = new SqlCommand();
        using (var reader = command.ExecuteReader())
        {
            using (SqlDataReader r = command.ExecuteReader())
            {
                Addrs.Add(new Addr(r.GetString(0), r.GetString(1)));
            }
        }
        foreach(Addr addr in Addrs)  // can use parallel here
        { }
    }
}
s
...