Параллельные потоки, вызывающие один и тот же метод, приводят к дубликатам в коллекции ConcurrentBag <DataRow> - PullRequest
1 голос
/ 02 апреля 2020

Мне нужно обработать большой набор данных, который занимает более часа в одном потоке. Я реализовал несколько многопоточности, чтобы ускорить это. Каждый поток обрабатывает определенный диапазон данных c без наложения, но когда они вставляют свои результаты в созданную мной коллекцию ConcurrentBag<DataRow>, появляются некоторые дубликаты.

Как это возможно? Любые предложения о том, что я мог бы сделать лучше, приветствуются!

Основной метод:

public static ConcurrentBag<DataRow> finalRowList = new ConcurrentBag<DataRow>(); //Create a concurrent collection of datarows so we can thread these calculations
public static DataTable results = new DataTable(); //Final datatable the datarows are added to

static void Main(string[] args)
{
//The goal is to calculate correlation between each item in list 1 against each item in list 2
List<string> Variable1List = populateVariable1List(); //Primary List of distinct items to iterate over
List<string> Variable2List = populateVariable2List(); //Secondary list of distinct items

DateTime endDate = new DateTime(2020, 3, 31);

//Separate threads based on alphabetic ranges so there is no overlap
Thread t1 = new Thread(() => CalculatePairCorrelation(Variable1List.Where(s => string.Compare(s, "G") < 0), Variable2List, endDate));
Thread t2 = new Thread(() => CalculatePairCorrelation(Variable1List.Where(s => string.Compare(s, "G") >= 0 && string.Compare(s, "M") < 0), Variable2List, endDate));
Thread t3 = new Thread(() => CalculatePairCorrelation(Variable1List.Where(s => string.Compare(s, "M") >= 0 && string.Compare(s, "S") < 0), Variable2List, endDate));
Thread t4 = new Thread(() => CalculatePairCorrelation(Variable1List.Where(s => string.Compare(s, "S") >= 0), Variable2List, endDate));

List<Thread> threads = new List<Thread>();
threads.Add(t1);
threads.Add(t2);
threads.Add(t3);
threads.Add(t4);

foreach (Thread t in threads)
{
    t.Start();
}

foreach (Thread t in threads)
{
    t.Join();
}

//Add rows from finalRowList to final datatable
foreach (var dr in finalRowList)
{
    results.Rows.Add(dr);
}
}

CalculatePairCorrelation () code:

public static void CalculatePairCorrelation(IEnumerable<string> list1, IEnumerable<string> list2, DateTime endDate, int rows)
{
    foreach (var item1 in list1)
    {
        foreach (var item2 in list2)
        {                
            double r10 = CalculateCorrelation(item1, item2, endDate, 10);
            double r30 = CalculateCorrelation(item1, item2, endDate, 30);

            var dr = results.NewRow();
            dr["Item1"] = item1;
            dr["Item2"] = item2;
            dr["R10"] = r10;
            dr["R30"] = r30;

            finalRowList.Add(dr); //Add to thread-safe collection
        }
    }
}


1 Ответ

1 голос
/ 02 апреля 2020

Проблема может быть связана с этой строкой, которая вызывается в параллельном пути:

var dr = results.NewRow();

Создание DataRow, вероятно, изменяет базовый DataTable, который не является потокобезопасным классом .

Мое предложение состоит в том, чтобы держаться подальше от одновременных сборов и ручного разделения данных, и вместо этого использовать PLINQ , который прост в использовании и затрудняет что-то, что go неправильно :

var resultsList = Variable1List
    .SelectMany(_ => Variable2List, (Item1, Item2) => (Item1, Item2))
    .AsParallel()
    .AsOrdered() // Optional
    .WithDegreeOfParallelism(4) // Optional
    .Select(pair => (
        Item1: pair.Item1,
        Item2: pair.Item2,
        R10: CalculateCorrelation(pair.Item1, pair.Item2, endDate, 10),
        R30: CalculateCorrelation(pair.Item1, pair.Item2, endDate, 30)
    ))
    .ToList();

foreach (var result in resultsList)
{
    var dr = results.NewRow();
    dr["Item1"] = result.Item1;
    dr["Item2"] = result.Item2;
    dr["R10"] = result.R10;
    dr["R30"] = result.R30;
    results.Rows.Add(dr);
}
...