Потокобезопасный буфер данных для пакетных вставок контролируемого размера - PullRequest
4 голосов
/ 03 августа 2011

У меня есть симуляция, которая генерирует данные, которые должны быть сохранены в базе данных.

ParallelLoopResult res = Parallel.For(0, 1000000, options, (r, state) =>
{
    ComplexDataSet cds = GenerateData(r);

    SaveDataToDatabase(cds);

});

Симуляция генерирует большое количество данных, поэтому было бы нецелесообразно сначала генерировать их, а затем сохранять ихв базу данных (до 1 ГБ данных), и также не имеет смысла сохранять ее в базе данных один за другим (слишком маленькие трансформации для практического применения).Я хочу вставить их в базу данных как пакетную вставку с контролируемым размером (скажем, 100 с одним коммитом).

Однако я думаю, что мои знания в области параллельных вычислений меньше теоретических.Я придумал это (что, как вы видите, очень некорректно):

DataBuffer buffer = new DataBuffer(...);

ParallelLoopResult res = Parallel.For(0, 10000000, options, (r, state) =>
{
    ComplexDataSet cds = GenerateData(r);

    buffer.SaveDataToBuffer(cds, i == r - 1);

});

public class DataBuffer
{
    int count = 0;
    int limit = 100

    object _locker = new object();

    ConcurrentQueue<ConcurrentBag<ComplexDataSet>> ComplexDataBagQueue{ get; set; }

    public void SaveDataToBuffer(ComplexDataSet data, bool isfinalcycle)
    {
            lock (_locker)
            {
                if(count >= limit)
                {
                    ConcurrentBag<ComplexDataSet> dequeueRef;
                    if(ComplexDataBagQueue.TryDequeue(out dequeueRef))
                    {
                        Commit(dequeueRef);
                    }

                    _lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
                    ComplexDataSetsQueue.Enqueue(_lastItemRef);
                    count = 1;
                }
                else
                {
                    // First time
                    if(_lastItemRef == null)
                    {
                        _lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
                        ComplexDataSetsQueue.Enqueue(_lastItemRef);
                        count = 1;
                    }
                    // If buffer isn't full
                    else
                    {
                        _lastItemRef.Add(data);
                        count++;
                    }
                }

                if(isfinalcycle)
                {
                        // Commit everything that hasn't been committed yet
                        ConcurrentBag<ComplexDataSet> dequeueRef;    
                    while (ComplexDataSetsQueue.TryDequeue(out dequeueRef))
                    {
                        Commit(dequeueRef);
                    }
                }
            }
    }

    public void Commit(ConcurrentBag<ComplexDataSet> data)
    {
        // Commit data to database..should this be somehow in another thread or something ?
    }
}

Как вы видите, я использую очередь для создания буфера, а затем вручную решаю, когда фиксировать.Однако у меня есть сильное чувство, что это не очень эффективное решение моей проблемы.Во-первых, я не уверен, правильно ли я блокирую.Во-вторых, я не уверен, даже если это полностью поточно-ориентированный (или вообще).

Не могли бы вы взглянуть на минуту и ​​прокомментировать, что мне делать по-другому?Или, если есть совершенно лучший способ сделать это (используя какую-то технику Продюсер-Потребитель или что-то в этом роде)?

Спасибо и наилучшие пожелания, D.

Ответы [ 3 ]

4 голосов
/ 04 августа 2011

Нет необходимости использовать блокировки или дорогие, безопасные для параллелизма структуры данных. Все данные независимы, поэтому введение блокировки и совместного использования только ухудшит производительность и масштабируемость.

Parallel.For имеет перегрузку, которая позволяет вам указывать данные для каждого потока. В этом вы можете хранить частную очередь и частную связь с базой данных.

Также: Parallel.For внутренне разделяет ваш диапазон на более мелкие куски. Совершенно эффективно пройти огромный диапазон, поэтому ничего не изменится.

Parallel.For(0, 10000000, () => new ThreadState(),
    (i, loopstate, threadstate) =>
{
    ComplexDataSet data = GenerateData(i);

    threadstate.Add(data);

    return threadstate;
}, threadstate => threadstate.Dispose());

sealed class ThreadState : IDisposable
{
    readonly IDisposable db;
    readonly Queue<ComplexDataSet> queue = new Queue<ComplexDataSet>();

    public ThreadState()
    {
        // initialize db with a private MongoDb connection.
    }

    public void Add(ComplexDataSet cds)
    {
        queue.Enqueue(cds);

        if(queue.Count == 100)
        {
            Commit();
        }
    }

    void Commit()
    {
        db.Write(queue);
        queue.Clear();
    }

    public void Dispose()
    {
        try
        {
            if(queue.Count > 0)
            {
                Commit();
            }
        }
        finally
        {
            db.Dispose();
        }
    }
}

Теперь MongoDb в настоящее время не поддерживает действительно параллельные вставки - он удерживает некоторые дорогостоящие блокировки на сервере, поэтому параллельные коммиты не дадут вам большой (если вообще) скорости. Они хотят исправить это в будущем, так что вы можете получить бесплатное ускорение в один прекрасный день.

Если вам нужно ограничить количество соединений с базой данных, хорошей альтернативой является настройка производителя / потребителя. Вы можете использовать очередь BlockingCollection, чтобы сделать это эффективно без каких-либо блокировок:

// Specify a maximum of 1000 items in the collection so that we don't
// run out of memory if we get data faster than we can commit it.
// Add() will wait if it is full.

BlockingCollection<ComplexDataSet> commits =
    new BlockingCollection<ComplexDataSet>(1000);

Task consumer = Task.Factory.StartNew(() =>
    {
        // This is the consumer.  It processes the
        // "commits" queue until it signals completion.

        while(!commits.IsCompleted)
        {
            ComplexDataSet cds;

            // Timeout of -1 will wait for an item or IsCompleted == true.

            if(commits.TryTake(out cds, -1))
            {
                // Got at least one item, write it.
                db.Write(cds);

                // Continue dequeuing until the queue is empty, where it will
                // timeout instantly and return false, or until we've dequeued
                // 100 items.

                for(int i = 1; i < 100 && commits.TryTake(out cds, 0); ++i)
                {
                    db.Write(cds);
                }

                // Now that we're waiting for more items or have dequeued 100
                // of them, commit.  More can be continue to be added to the
                // queue by other threads while this commit is processing.

                db.Commit();
            }
        }
    }, TaskCreationOptions.LongRunning);

try
{
    // This is the producer.

    Parallel.For(0, 1000000, i =>
        {
            ComplexDataSet data = GenerateData(i);
            commits.Add(data);
        });
}
finally // put in a finally to ensure the task closes down.
{
    commits.CompleteAdding(); // set commits.IsFinished = true.
    consumer.Wait(); // wait for task to finish committing all the items.
}
1 голос
/ 03 августа 2011

В вашем примере у вас есть 10 000 000 пакетов работ.Каждый из этого должен быть распределен в поток.Предполагая, что у вас нет действительно большого количества ядер процессора, это не оптимально.Вы также должны синхронизировать свои потоки при каждом вызове buffer.SaveDataToBuffer (используя блокировки).Кроме того, вы должны знать, что переменная r не обязательно увеличивается на единицу в хронологическом представлении (пример: Thread1 выполняет r с 1,2,3 и Thread2 с 4,5,6. Хронологически это приведет к следующемупоследовательность r передается в SaveDataToBuffer 1,4,2,5,3,6 (приблизительно).

Я бы увеличил пакеты работы, а затем зафиксировал бы каждый пакет сразу.Это также дает то преимущество, что вам не нужно часто блокировать / синхронизировать все.

Вот пример:

int total = 10000000;
int step = 1000;

Parallel.For(0, total / step, (r, state) =>
{
    int start = r * start;
    int end = start + step;

    ComplexDataSet[] result = new ComplexDataSet[step];

    for (int i = start; i < end; i++)
    {
        result[i - start] = GenerateData(i);
    }

    Commit(result);
});

В этом примере вся работа разбита на 10 000 пакетов(которые выполняются параллельно), и каждый пакет генерирует 1000 элементов данных и фиксирует их в базе данных.

При таком решении метод Commit может стать узким местом, если не разработан с умом.Лучше всего сделать его безопасным для потоков без использования каких-либо блокировок.Это может быть достигнуто, если вы не используете общие объекты между потоками, которые нуждаются в синхронизации.

Например, для серверной части SQL Server это будет означать создание собственного подключения SQL в контексте каждого Commit() вызова:

private void Commit(ComplexDataSet[] data)
{
    using (var connection = new SqlConnection("connection string..."))
    {
        connection.Open();

        // insert your data here...
    }
}
1 голос
/ 03 августа 2011

Вместо увеличения сложности программного обеспечения, рассмотрим упрощение. Вы можете изменить код на три части:

  1. Рабочие, которые ставят в очередь

    Это одновременный GenerateData в Parallel.For , который выполняет тяжелые вычисления и производит ComplexDataSet .

  2. Фактическая очередь

    Параллельная очередь, в которой хранятся результаты из [1] - столько ComplexDataSet . Здесь я предположил, что один экземпляр ComplexDataSet на самом деле не очень ресурсоемкий и достаточно легкий. Пока очередь является параллельной, она будет поддерживать параллельные операции «вставки» и «удаления».

  3. Рабочие, которые вытесняют

    Код, который берет один экземпляр ComplexDataSet из очереди обработки [2] и помещает его в параллельную сумку (или другое хранилище). Как только в сумке будет N количество предметов, которые вы блокируете, прекратите удаление из очереди, сбросьте содержимое сумки в базу данных и очистите ее. Наконец, вы разблокируете и возобновляете снятие очереди.

Вот некоторый метакод (он все еще компилируется, но нуждается в улучшении)

[1]

// [1] - Class is responsible for generating complex data sets and 
// adding them to processing queue
class EnqueueWorker
{
    //generate data and add to queue
    internal void ParrallelEnqueue(ConcurrentQueue<ComplexDataSet> resultQueue)
    {
        Parallel.For(1, 10000, (i) =>
        {
            ComplexDataSet cds = GenerateData(i);
            resultQueue.Enqueue(cds);

        });
    }

    //generate data
    ComplexDataSet GenerateData(int i)
    {
        return new ComplexDataSet();
    }
}
* * Тысяча сорок-одина [3]
//[3] This guy takes sets from the processing queue and flush results when 
// N items have been generated
class DequeueWorker
{
    //buffer that holds processed dequeued data
    private static ConcurrentBag<ComplexDataSet> buffer;

    //lock to flush the data to the db once in a while
    private static object syncRoot = new object();

    //take item from processing queue and add it to internal buffer storage
    //once buffer is full - flush it to the database
    internal void ParrallelDequeue(ConcurrentQueue<ComplexDataSet> resultQueue)
    {
        buffer = new ConcurrentBag<ComplexDataSet>();
        int N = 100;

        Parallel.For(1, 10000, (i) =>
        {
            //try dequeue
            ComplexDataSet cds = null;

            var spinWait = new SpinWait();

            while (cds == null)
            {
                resultQueue.TryDequeue(out cds);
                spinWait.SpinOnce();
            }

            //add to buffer
            buffer.Add(cds);

            //flush to database if needed
            if (buffer.Count == N)
            {
                lock (syncRoot)
                {
                    IEnumerable<ComplexDataSet> data = buffer.ToArray();

                    // flush data to database

                    buffer = new ConcurrentBag<ComplexDataSet>();
                }
            }

        });
    }        
}

[2] и использование

class ComplexDataSet { }

class Program
{
    //processing queueu - [2]
    private static ConcurrentQueue<ComplexDataSet> processingQueue;

    static void Main(string[] args)
    {
        // create new processing queue - single instance for whole app
        processingQueue = new ConcurrentQueue<ComplexDataSet>();

        //enqueue worker
        Task enqueueTask = Task.Factory.StartNew(() =>
            {
                EnqueueWorker enqueueWorker = new EnqueueWorker();
                enqueueWorker.ParrallelEnqueue(processingQueue);
            });

        //dequeue worker
        Task dequeueTask = Task.Factory.StartNew(() =>
        {
            DequeueWorker dequeueWorker = new DequeueWorker();
            dequeueWorker.ParrallelDequeue(processingQueue);
        });            
    }
}
...