Как ускорить обработку большого CSV с использованием ruby - PullRequest
7 голосов
/ 25 марта 2011

Для проекта мне нужно разобрать несколько довольно больших файлов CSV.Содержимое некоторых записей хранится в базе данных MySQL.Я пытаюсь ускорить это с помощью многопоточности, но до сих пор это только замедляет работу.

Я анализирую файл CSV (до 10 ГБ) и некоторые из этих записей (примерно 5 млн. Из 20 млн. +запись CSV) должна быть вставлена ​​в базу данных MySQL.Чтобы определить, какую запись нужно вставить, мы используем сервер Redis с наборами, которые содержат правильные идентификаторы / ссылки.

Поскольку мы обрабатываем около 30 из этих файлов в любой момент времени, и существуют некоторые зависимости, мы хранимкаждый файл в очереди Resque и несколько серверов, обрабатывающих эти (приоритетные) очереди.

В двух словах:

class Worker
  def self.perform(file)
    CsvParser.each(file) do |line|
      next unless check_line_with_redis(line)
      a = ObjectA.find_or_initialize_by_reference(line[:reference])
      a.object_bs.destroy_all
      a.update_attributes(line)
    end
  end

Это работает, хорошо масштабируется по горизонтали (больше файлов CSV = больше серверов), но большие файлы CSV создают проблему.В настоящее время у нас есть файлы, которые занимают более 75 часов для такого анализа.Я уже думал о нескольких оптимизациях:

Один из них - сокращение запросов MySQL;мы создаем экземпляры объектов AR, в то время как вставка с простым SQL, если мы знаем идентификатор объекта, выполняется намного быстрее.Таким образом, мы можем, вероятно, избавиться от большей части AR и, возможно, даже от Rails, чтобы таким образом устранить накладные расходы.Мы не можем использовать обычные данные загрузки MySQL, поскольку нам необходимо сопоставить записи CSV с другими объектами, которые могут иметь разные идентификаторы (мы объединяем дюжину устаревших баз данных в новую базу данных).

Другойпытаясь сделать больше в то же время.Существует некоторое время ожидания ввода-вывода, время ожидания сети как для Redis, так и для MySQL, и даже если MRI использует зеленые потоки, это может позволить нам планировать наши запросы MySQL одновременно с чтением ввода-вывода и т. Д. Но с использованием следующего кода:

class Worker
  def self.perform(file)
    CsvParser.each(file) do |line|
      next unless check_line_with_redis(line)
      create_or_join_thread(line) do |myLine|
        a = ObjectA.find_or_initialize_by_reference(myLine[:reference])
        a.object_bs.destroy_all
        a.update_attributes(myLine)
      end
    end

    def self.create_or_join_thread(line)
      @thread.join if @thread.present?
      @thread = Thread.new(line) do |myLine|
        yield myLine
      end
    end
  end

Это медленно замедляет процесс.Когда я ps au запускается на 100% CPU, но со временем он падает до 2-3%.В этот момент он вообще не вставляет новые записи, он просто зависает.

У меня есть strace d процесс, и сначала я вижу, что запросы MySQL проходят мимо, через некоторое время кажется, что этоне выполняет мой код ruby ​​вообще.Это может быть тупик (завис после анализа строки CSV last , но процесс продолжал работать на 5% CPU и не завершался), или что-то, что я прочитал здесь: http://timetobleed.com/ruby-threading-bugfix-small-fix-goes-a-long-way/

Я использую Rails 2.3.8, REE, 1.8.7-2010.02 в Ubuntu 10.10.Буду очень признателен за понимание того, как обрабатывать большое количество потоков (или, может быть, почему бы вообще не использовать потоки здесь!)

Ответы [ 2 ]

1 голос
/ 19 октября 2012

Вы можете попробовать обернуть все это в одну транзакцию - это, вероятно, будет иметь значение:

class Worker
  def self.perform(file)
    ObjectA.transaction do 
      CsvParser.each(file) do |line|
        next unless check_line_with_redis(line)
        a = ObjectA.find_or_initialize_by_reference(line[:reference])
        a.object_bs.destroy_all
        a.update_attributes(line)
      end
    end
  end
end

В противном случае каждое сохранение будет заключено в свою собственную транзакцию.Хотя для файла 10 ГБ вы, вероятно, захотите разбить его на фрагменты, скажем, 1000 вставок на транзакцию или что-то в этом роде.

1 голос
/ 25 марта 2011

Есть ли у вас индексы в этих таблицах?

Не могли бы вы временно отключить эти индексы во время массовых вставок?

Перед выполнением массовых вставок мы отключаем индексные ключи:

ALTER TABLE foo DISABLE KEYS

После того, как мы закончим, мы включимключи индекса:

ALTER TABLE foo ENABLE KEYS

Из документов:

ALTER TABLE ... DISABLE KEYS сообщает MySQL о прекращении обновления неуникальных индексов.Затем следует использовать ALTER TABLE ... ENABLE KEYS для повторного создания отсутствующих индексов.MySQL делает это с помощью специального алгоритма, который намного быстрее, чем вставка ключей один за другим, поэтому отключение ключей перед выполнением массовых операций вставки должно значительно ускорить процесс.Использование ALTER TABLE ... DISABLE KEYS требует привилегии INDEX в дополнение к привилегиям, упомянутым ранее.Хотя неуникальные индексы отключены, они игнорируются для операторов, таких как SELECT и EXPLAIN, которые в противном случае использовали бы их.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...