Наиболее эффективная многопоточная фильтрация дубликатов объектов (с использованием пользовательского ключа)? - PullRequest
0 голосов
/ 29 февраля 2012

Я пишу сервисный клиент, который вызывает огромные строки с разделителями, содержащие отдельные записи из удаленного сервиса.Из-за размера этих строк я делю вызовы удаленной службы на порции (диапазоны дат) и параллельно выполняю циклы по диапазонам дат, чтобы вызывать удаленную службу и анализировать данные.Проблема в том, что 50% + записей являются дубликатами, поэтому я хочу отфильтровать их ...

Вот мой текущий подход:

// We want to filter out duplicate markets by using the MarketId field...
HashSet<ParsedMarketData> exchangeFixtures = 
    new HashSet<ParsedMarketData>(
        new GenericEqualityComparer<ParsedMarketData, int>(pmd => pmd.MarketId));

DateTime[][] splitTimes = 
    SplitDateRange(startDate, endDate, TimeSpan.FromDays(1));

// Effectively a Tasks.Parallel.ForEach call...
_parallel.ForEach(splitTimes, startEndTime =>
{
    DateTime start = startEndTime[0];
    DateTime end = startEndTime[1];

    string marketDataString = remoteServiceProxy.GetMarketData(start, end);
    IEnumerable<ParsedMarketData> rows = 
        _marketDataParser.ParseMarketData(marketDataString);

    foreach (ParsedMarketData marketDataRow in rows)
    {
        lock (_syncObj)
        {
            // Ignore the return value as we don't care 
            // if it gets added or not...
            marketDataList.Add(exchangeFixture);
        }
    }
});

По сути, это заблокированная структура данных (котораянаходит дубликаты) наиболее эффективный подход к этой проблеме или его можно улучшить?

Возможно, стоит знать, что большинство (95% +) «дубликатов» встречаются в каждой временной шкале.Т.е. если мы будем извлекать «день A» и «день B» параллельно, между днем ​​A и днем ​​B не будет много (или вообще ничего) дубликатов (но много в каждом дне - и в моем решении каждый поток).

Ответы [ 2 ]

1 голос
/ 29 февраля 2012

Вам нужно будет настроить свой код, чтобы воспользоваться возможностями параллелизма в данных и сервисе. Звучит как одна нить в день может быть вариант.

На самом деле увидеть улучшение должно быть редко. Многократные потоки покупают больше циклов процессора, а не больше интернет-соединений, сетевых карт или сервисных машин. Высоки шансы, что оптимально всего два потока. Один для получения данных из сервиса, другой для его обработки. Позволяя перекрывать эти две операции, между ними создается потокобезопасная очередь производителя / потребителя. Вы можете получить выгоду только от большего количества потоков, если поток обработки требует больше времени, чем поток извлечения данных. Также сценарий, который позволяет вам легко профилировать код, вы можете ускорить обработку, но не поиск. Вам даже не нужен профилировщик для первой оценки. Если поток обработки данных не сжигает ядро ​​на 100%, то все готово.

0 голосов
/ 01 марта 2012

Учитывая данные, и высокую вероятность дублирования в каждом потоке (и низкую вероятность дублирования между потоками), я решил использовать следующее решение: оно позволяет каждому потоку выполнять свою работу без помех от блокировок и выполняет небольшую фильтрацию. в конце потока вызывающего абонента, чтобы убедиться, что фильтрация выполнена правильно.

Это также имеет дополнительное преимущество, заключающееся в том, что порядок, в котором объекты возвращаются из вызовов службы (порядок даты), поддерживается между потоками, поэтому нет необходимости сортировать его в конце.

public IEnumerable<Stuff> GetStuffs(DateTime startDate, DateTime endDate)
{
    if (startDate >= endDate)
        throw new ArgumentException("startDate must be before endDate", "startDate");

    IDateRange dateRange = new DateRange(startDate, endDate);

    IDateRange[] dateRanges = _dateRangeSplitter.DivideRange(dateRange, TimeSpan.FromDays(1)).ToArray();

    IEnumerable<Stuff>[] resultCollections = new IEnumerable<Stuff>[dateRanges.Length];

    _parallel.For(0, dateRanges.Length, i =>
    {
        IDateRange splitRange = dateRanges[i];

        IEnumerable<Stuff> stuffs = GetMarketStuffs(splitRange);

        resultCollections[i] = stuffs;
    });

    Stuff[] marketStuffs = resultCollections.SelectMany(ef => ef).Distinct(ef => ef.EventId).ToArray();

    return marketStuffs;
}

private IEnumerable<Stuff> GetMarketStuffs(IDateRange splitRange)
{
    IList<Stuff> stuffs = new List<Stuff>();
    HashSet<int> uniqueStuffIds = new HashSet<int>();

    string marketStuffString = _slowStuffStringProvider.GetMarketStuffs(splitRange.Start, splitRange.End);

    IEnumerable<ParsedStuff> rows = _stuffParser.ParseStuffString(marketStuffString);

    foreach (ParsedStuff parsedStuff in rows)
    {
        if (!uniqueStuffIds.Add(parsedStuff.EventId))
        {
            continue;
        }

        stuffs.Add(new Stuff(parsedStuff));
    }
    return stuffs;
}
...