Обработка плоского файла кусками с использованием нескольких потоков с использованием шаблона «производитель / потребитель» и SqlBulkCopy в БД SQL Server - PullRequest
4 голосов
/ 14 января 2010

Я надеюсь, что вы будете терпеть меня. Я хотел предоставить как можно больше информации. Основная проблема заключается в том, как создать структуру (например, стек), которая будет использоваться несколькими потоками, которые будут извлекать значение и использовать его для обработки одного большого плоского файла и, возможно, повторять циклы снова и снова, пока не будет обработан весь файл. Если файл содержит 100 000 записей, которые могут быть обработаны 5 потоками, используя 2000 строк тогда каждый поток получит 10 блоков для обработки.

Моя цель - переместить данные в плоский файл (с заголовком ... Subheader ... Detail, Detail, Detail, ... Detail, SubFooter, Subheader ... Detail, Detail, Detail, ... Detail SubFooter, Subheader ... Detail, Detail, Detail, ... Detail, SubFooter, структура нижнего колонтитула) в БД OLTP, которая имеет режим восстановления в Простое (возможно полное) в 3 таблицы: первая представляет уникальный ключ подзаголовка, присутствующий в строке подзаголовка, вторая - промежуточное Таблица SubheaderGroup, представляющая группировку строк подробностей в порциях из 2000 записей (должна иметь идентификатор PK подзаголовка в качестве своего FK и 3-я, представляющая строки подробностей с FK, указывающим на PK подзаголовка.

Я занимаюсь ручным управлением транзакциями, поскольку могу иметь десятки тысяч строк Detail. и я использую специальное поле, которое установлено в 0 в таблицах назначения во время загрузки, а затем в конце обработки файла я выполняю обновление транзакций, меняя это значение на 1, что может сигнализировать другому приложению, что загрузка завершена.

Я хочу нарезать этот плоский файл на несколько равных частей (одинаковое количество строк), которые можно обработать несколькими потоками и импортировать с помощью SqlBulkCopy с использованием IDataReader, созданного из метаданных таблицы назначения).

Я хочу использовать шаблон производителя / потребителя (как описано в ссылке ниже - анализ PDF и пример кода) для использования SqlBulkCopy с опцией SqlBulkCopyOptions.TableLock. http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx Этот шаблон позволяет создавать несколько производителей, и эквивалентное количество потребителей должно подписаться на производителей, чтобы использовать строку.

В проекте TestSqlBulkCopy в файле DataProducer.cs есть метод, который имитирует создание тысяч записей.

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

Этот метод будет выполнен в контексте нового потока. Я хочу, чтобы этот новый поток читал только уникальный фрагмент исходного плоского файла, а другой поток будет непосредственно обрабатывать следующий фрагмент. Затем потребители будут перемещать данные (которые передаются им) в базу данных SQL Server с использованием класса SqlBulkCopy ADO.NET.

Таким образом, вопрос заключается в том, чтобы основная программа диктовала, какой lineFrom к lineTo должен обрабатываться каждым потоком, и я думаю, что это должно происходить при создании потока. Второе решение, вероятно, состоит в том, чтобы потоки разделяли некоторую структуру и использовали что-то уникальное для них (например, номер потока или порядковый номер) для поиска общей структуры (возможно, стека и извлечения значения (блокировки стека при этом)), а затем следующий поток будет затем выберите следующее значение. Основная программа выберет плоский файл и определит размер фрагментов и создаст стек.

Так может ли кто-нибудь предоставить фрагменты кода, псевдокод о том, как несколько потоков будут обрабатывать один файл и получать только уникальную часть этого файла?

Спасибо, Rad

1 Ответ

3 голосов
/ 14 января 2010

Что для меня хорошо, так это использование очереди для хранения необработанной работы и словаря для отслеживания работы на борту:

  1. Создать рабочий класс, который принимает имя файла, начальная строка и количество строк и имеет метод обновления, который делает вставку базы данных. Передайте метод обратного вызова, который работник использует, чтобы сигнализировать, когда это сделано.
  2. Загрузка очереди с экземплярами работника класс, по одному на каждый кусок.
  3. Создать поток диспетчера, который удаляет рабочий экземпляр, запускает свое обновление метод, и добавляет рабочий экземпляр в словарь, ключом которого является ManagedThreadId его потока. Сделай это пока ваш максимально допустимый поток счет достигнут, как отмечено Dictionary.Count. Диспетчер ждет окончания потока а затем запускает другой. Есть несколько способов подождать.
  4. По завершении каждого потока его обратный вызов удаляет его ManagedThreadId из Толковый словарь. Если поток выходит из-за ошибки (например, время ожидания соединения), затем обратный вызов может повторно вставить рабочий в очередь. Это хорошее место обновить ваш интерфейс.
  5. Ваш пользовательский интерфейс может отображать активные потоки, общий прогресс и время на чанк. Он может позволить пользователю регулировать количество активных потоков, приостанавливать обработку, отображать ошибки или останавливаться раньше.
  6. Когда очередь и словарь пусты, все готово.

Демонстрационный код в виде консольного приложения:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...