Несколько запросов Нокогири одновременно - PullRequest
3 голосов
/ 23 ноября 2011

У меня есть контроллер, который я пытаюсь получить файлы XML из удаленных источников.

Что-то вроде:

@artist = Nokogiri.XML(open(url).read)

Однако я хочу выполнить несколько из них одновременно, получая разныеданные.Могу ли я использовать темы как-то?

Выполнение одного занимает около 400 мс.Поэтому, когда они выполняются три раза подряд, ответ составляет примерно 1 с +.

Ответы [ 2 ]

5 голосов
/ 23 ноября 2011

Да, вы можете использовать темы:

named_urls = {
  artist: 'http://foo.com/bar',
  song:   'http://foo.com/jim',
  # etc.
}
@named_xmls = {}
one_at_a_time = Mutex.new
named_urls.map do |name,url|
  Thread.new do
    doc = Nokogiri.XML(open(url).read)
    one_at_a_time.synchronize{ @named_xmls[name] = doc }
  end
end.each(&:join)

# At this point @named_xmls will be populated will all Nokogiri documents

Я не уверен, что для записи в разные ключи в общем хэше требуется мьютекс или нет, но это не мешает быть безопасным.

4 голосов
/ 24 сентября 2012

Для большого количества URL вы не можете открыть большое количество потоков, потому что вы будете насыщать пропускную способность вашего соединения и вы начнете получать ошибки соединения.Для моего конкретного кабельного модема и конкретного сервера я обнаружил, что 16 потоков - это хорошее значение.

Я использовал Mathematica , чтобы контролировать и изменять количество потоков в моей программе очистки веб-страниц ruby ​​иконтролировать его производительность для разного количества потоков.Вот результат:

performance vs threads plot

Вместо прямого использования Thread.new я написал функцию-обертку, которая открывает новый поток, только если общее количество потоков меньше настроенного вамимаксимум:

def maybe_new_thread
  File.open('max_threads.cfg', 'r') { |file| @MAX_THREADS =  file.gets.to_i }
  if Thread.list.size < @MAX_THREADS
    Thread.new { yield }
  else
    yield
  end
end

Обратите внимание, что максимальное количество желаемых потоков - это просто число, сохраненное в файле с именем max_threads.cfg, и этот файл читается при каждом вызове функции.Это позволяет вам изменять значение этой переменной при запуске программы.

Общая структура программы выглядит следующим образом:

named_urls = [ 'http://foo.com/bar', (... hundreds of urls ... ),'http://foo.com/jim']
named_urls.each do |url|
  maybe_new_thread do
    doc = Nokogiri.HTML(open(url))
    process_and_insert_in_database(doc)
  end
end

Обратите внимание, что каждый поток сохраняет свой результат вбазы данных, поэтому мне не нужно использовать класс Mutex для координации чего-либо между потоками.

Когда я вставляю в базу данных, я включаю столбец с точным временем, когда вставляется каждый результат.Это очень важно, чтобы вы могли рассчитать производительность, которую вы получаете.Убедитесь, что вы определили этот столбец с поддержкой миллисекунд (я использовал MariaDB 5.3).

Этот код я использовал в Mathematica , чтобы контролировать максимальное количество потоков и строить график в реальном времени.:

named_urls = {
  'http://foo.com/bar', (... hundreds of urls ... ),'http://foo.com/jim',
}
named_urls.each do |url|
  maybe_new_thread do
    doc = Nokogiri.HTML(open(url))
    process_and_insert_in_database(doc)
  end
end

setNumberOfThreads[n_] := Module[{},
  Put[n, "max_threads.cfg"];
  SQLExecute[conn,"DELETE FROM results"]]

operationsPerSecond := SQLExecute[conn,
  "SELECT
     (SELECT COUNT(*) FROM results)/
     (SELECT TIME_TO_SEC(TIMEDIFF((SELECT fin FROM results ORDER BY finishTime DESC LIMIT 1),
                                  (SELECT fin FROM results ORDER BY finishTime      LIMIT 1))))"][[1, 1]];
cops = {};
RunScheduledTask[AppendTo[cops, operationsPerSecond], 2];
Dynamic[ListLinePlot[cops]]

Во время работы, когда вы увидите, что производительность стабильна, вы можете изменить число потоков с помощью setNumberOfThreads[] и увидеть эффект в производительности.

Один финалкомментарий.Вместо использования метода open-uri напрямую, я использую эту оболочку, так что в случае ошибок она повторяется автоматически:

def reliable_open(uri)
  max_retry = 10
  try_counter = 1
  while try_counter < max_retry
    begin
      result = open(uri)
      return result
    rescue
      puts "Error when trying to open #{uri}"
      try_counter += 1
      sleep try_counter * 10
    end
  end
  raise "Imposible to open after #{max_retry} retries"
end
...