C # Nested Parallel.ForEach при вставке в базу данных SQL - PullRequest
0 голосов
/ 27 июня 2018

У нас есть объект (XML или JSON), и мы успешно сопоставляем его с DTO, вставка в нашу базу данных занимает слишком много времени (5 ~ 7 минут), поэтому мы прошли Parallel.ForEach, но в итоге мы заметили, что некоторые данные введены неправильно, например, Category содержит все элементы с одинаковым именем, но другие разные свойства на 100% верны, в другом случае мы заметили, что все данные в одной категории совпадают, хотя при условии, что у объекта JSON этого нет.

Признаюсь, это так быстро, это занимает меньше минуты, но при неправильной вставке, посмотрите ниже используемый код:

JSON

[
  {
    "CategoryId": 1,
    "CategoryName": "Drinks",
    "SortOrder": 1,
    "Products": [
      {
        "ProductId": 100,
        "ProductName": "Black Tea",
        "SortOrder": 1,
        "Price": 5,
        "Choices": []
      },
      {
        "ProductId": 101,
        "ProductName": "Turkish Coffee",
        "SortOrder": 2,
        "Price": 7.5,
        "Choices": []
      },
      {
        "ProductId": 102,
        "ProductName": "Green Tea",
        "SortOrder": 3,
        "Price": 6,
        "Choices": []
      },
      {
        "ProductId": 103,
        "ProductName": "Café Latte Medium",
        "SortOrder": 4,
        "Price": 10,
        "Choices": []
      },
      {
        "ProductId": 104,
        "ProductName": "Orange Juice",
        "SortOrder": 5,
        "Price": 11,
        "Choices": []
      },
      {
        "ProductId": 105,
        "ProductName": "Mixed Berry Juice",
        "SortOrder": 6,
        "Price": 12.5,
        "Choices": []
      }
    ]
  },
  {
    "CategoryId": 1,
    "CategoryName": "Meals",
    "SortOrder": 1,
    "Products": [
      {
        "ProductId": 200,
        "ProductName": "Breakfast Meal",
        "SortOrder": 1,
        "Price": 16,
        "Choices": [
          {
            "ChoiceId": 3000,
            "ChoiceName": "Strawberry Jam",
            "SortOrder": 1,
            "Price": 0
          },
          {
            "ChoiceId": 3001,
            "ChoiceName": "Apricot Jam",
            "SortOrder": 2,
            "Price": 0
          },
          {
            "ChoiceId": 3002,
            "ChoiceName": "Orange Jam",
            "SortOrder": 3,
            "Price": 0
          },
          {
            "ChoiceId": 3003,
            "ChoiceName": "Café Latte",
            "SortOrder": 4,
            "Price": 2
          }
        ]
      },
      {
        "ProductId": 201,
        "ProductName": "Mixed Grill",
        "SortOrder": 1,
        "Price": 30,
        "Choices": [
          {
            "ChoiceId": 3004,
            "ChoiceName": "Moutabal",
            "SortOrder": 1,
            "Price": 0
          },
          {
            "ChoiceId": 3005,
            "ChoiceName": "Mineral Water",
            "SortOrder": 2,
            "Price": 0
          },
          {
            "ChoiceId": 3006,
            "ChoiceName": "French Fries",
            "SortOrder": 2,
            "Price": 0
          },
          {
            "ChoiceId": 3007,
            "ChoiceName": "Grilled Potatoes",
            "SortOrder": 2,
            "Price": 0
          }
        ]
      }
    ]
  }
]

код C #

Parallel.ForEach(categories, (category) =>
{
    var newCreatedCategoryId = 0;
    using (var connection = new SqlConnection("CONNECTION_STRING_HERE"))
    {
        connection.Open();
        using (var command = new SqlCommand("SP_INSERT_INTO_CATEGORIES", connection))
        {
            command.CommandType = CommandType.StoredProcedure;
            command.Parameters.AddWithValue("@P1", category.CategoryName);
            command.Parameters.AddWithValue("@P2", category.SortOrder);
            newCreatedCategoryId = int.Parse(command.ExecuteScalar().ToString());
            command.Dispose();
        }

        connection.Close();
    }

    if (newCreatedCategoryId > 0)
    {
        Parallel.ForEach(category.Products, (product) =>
        {
            using (var connection = new SqlConnection("CONNECTION_STRING_HERE"))
            {
                connection.Open();
                using (var command = new SqlCommand("SP_INSERT_INTO_PRODUCTS", connection))
                {
                    command.CommandType = CommandType.StoredProcedure;
                    command.Parameters.AddWithValue("@P1", product.ProductName);
                    command.Parameters.AddWithValue("@P2", product.Price);
                    command.Parameters.AddWithValue("@P3", product.SortOrder);
                    command.Parameters.AddWithValue("@P4", newCreatedCategoryId);
                    command.ExecuteNonQuery();
                    command.Dispose();
                }

                connection.Close();
            }
        });
    }
});

Я посмотрел здесь , но это не наша проблема, мы уже используем SCOPE_IDENTITY(), чтобы получить последний сгенерированный идентификатор в текущей области выполнения.

С другой стороны, нельзя использовать SqlBulkCopy для вставки этого количества данных, даже если без TableLock.

Ответы [ 3 ]

0 голосов
/ 27 июня 2018

Зацикливаемые объекты не являются поточно-ориентированными. Вы можете добавить объект блокировки, однако это приведет к сериализации операции и уничтожению цели Parallel.Foreach. You need to change the Parallel.ForEach в стандартном цикле ForEach.

Потенциальные ловушки в параллелизме данных и задач

0 голосов
/ 27 июня 2018

Проблема в newCreatedCategoryId, меня смущает, почему вы снова вызываете newCreatedCategoryId = int.Parse(command.ExecuteScalar().ToString()); во внутреннем цикле. Я имею в виду, если это просто идентификатор категории, его не нужно увеличивать снова.

Взгляните на правку ниже. Вам также может быть лучше просто поместить вторую Parallel.ForEach в стандартную foreach Я имею в виду, что все равно все работает параллельно. Наконец, Parallel.ForEach на самом деле не подходит для задач ввода-вывода, правильный шаблон - асинхронный и ожидающий. сказав, что вы, вероятно, могли бы использовать ActionBlock из потока данных TPL, чтобы воспользоваться преимуществами обоих миров. Взгляните на пример потока данных в этом вопросе, на который я ответил Быстрая загрузка более 1000 файлов?

Parallel.ForEach(categories, (category) =>
{
    var newCreatedCategoryId = 0;
    using (var connection = new SqlConnection("CONNECTION_STRING_HERE"))
    {
        connection.Open();
        using (var command = new SqlCommand("SP_INSERT_INTO_CATEGORIES", connection))
        {
            command.CommandType = CommandType.StoredProcedure;
            command.Parameters.AddWithValue("@P1", category.CategoryName);
            command.Parameters.AddWithValue("@P2", category.SortOrder);
            newCreatedCategoryId = int.Parse(command.ExecuteScalar().ToString());
            command.Dispose();
        }

        connection.Close();
    }

    if (newCreatedCategoryId > 0)
    {
        foreach(product in category.Products)
        {
            using (var connection = new SqlConnection("CONNECTION_STRING_HERE"))
            {
                connection.Open();
                using (var command = new SqlCommand("SP_INSERT_INTO_PRODUCTS", connection))
                {
                    command.CommandType = CommandType.StoredProcedure;
                    command.Parameters.AddWithValue("@P1", product.ProductName);
                    command.Parameters.AddWithValue("@P2", product.Price);
                    command.Parameters.AddWithValue("@P3", product.SortOrder);
                    command.Parameters.AddWithValue("@P4", newCreatedCategoryId);
                    command.Dispose();
                }

                connection.Close();
            }
        }//);
    }
});
0 голосов
/ 27 июня 2018

Вы изменяете newCreatedCategoryId внутри Parallel.ForEach, что может привести к неверным данным, поскольку запросы не будут выполняться по порядку.

...