Для проекта мне нужно разобрать несколько довольно больших файлов CSV.Содержимое некоторых записей хранится в базе данных MySQL.Я пытаюсь ускорить это с помощью многопоточности, но до сих пор это только замедляет работу.
Я анализирую файл CSV (до 10 ГБ) и некоторые из этих записей (примерно 5 млн. Из 20 млн. +запись CSV) должна быть вставлена в базу данных MySQL.Чтобы определить, какую запись нужно вставить, мы используем сервер Redis с наборами, которые содержат правильные идентификаторы / ссылки.
Поскольку мы обрабатываем около 30 из этих файлов в любой момент времени, и существуют некоторые зависимости, мы хранимкаждый файл в очереди Resque и несколько серверов, обрабатывающих эти (приоритетные) очереди.
В двух словах:
class Worker
def self.perform(file)
CsvParser.each(file) do |line|
next unless check_line_with_redis(line)
a = ObjectA.find_or_initialize_by_reference(line[:reference])
a.object_bs.destroy_all
a.update_attributes(line)
end
end
Это работает, хорошо масштабируется по горизонтали (больше файлов CSV = больше серверов), но большие файлы CSV создают проблему.В настоящее время у нас есть файлы, которые занимают более 75 часов для такого анализа.Я уже думал о нескольких оптимизациях:
Один из них - сокращение запросов MySQL;мы создаем экземпляры объектов AR, в то время как вставка с простым SQL, если мы знаем идентификатор объекта, выполняется намного быстрее.Таким образом, мы можем, вероятно, избавиться от большей части AR и, возможно, даже от Rails, чтобы таким образом устранить накладные расходы.Мы не можем использовать обычные данные загрузки MySQL, поскольку нам необходимо сопоставить записи CSV с другими объектами, которые могут иметь разные идентификаторы (мы объединяем дюжину устаревших баз данных в новую базу данных).
Другойпытаясь сделать больше в то же время.Существует некоторое время ожидания ввода-вывода, время ожидания сети как для Redis, так и для MySQL, и даже если MRI использует зеленые потоки, это может позволить нам планировать наши запросы MySQL одновременно с чтением ввода-вывода и т. Д. Но с использованием следующего кода:
class Worker
def self.perform(file)
CsvParser.each(file) do |line|
next unless check_line_with_redis(line)
create_or_join_thread(line) do |myLine|
a = ObjectA.find_or_initialize_by_reference(myLine[:reference])
a.object_bs.destroy_all
a.update_attributes(myLine)
end
end
def self.create_or_join_thread(line)
@thread.join if @thread.present?
@thread = Thread.new(line) do |myLine|
yield myLine
end
end
end
Это медленно замедляет процесс.Когда я ps au
запускается на 100% CPU, но со временем он падает до 2-3%.В этот момент он вообще не вставляет новые записи, он просто зависает.
У меня есть strace
d процесс, и сначала я вижу, что запросы MySQL проходят мимо, через некоторое время кажется, что этоне выполняет мой код ruby вообще.Это может быть тупик (завис после анализа строки CSV last , но процесс продолжал работать на 5% CPU и не завершался), или что-то, что я прочитал здесь: http://timetobleed.com/ruby-threading-bugfix-small-fix-goes-a-long-way/
Я использую Rails 2.3.8, REE, 1.8.7-2010.02 в Ubuntu 10.10.Буду очень признателен за понимание того, как обрабатывать большое количество потоков (или, может быть, почему бы вообще не использовать потоки здесь!)