Это Ruby код, использующий потоки, пулы потоков и параллелизм правильно - PullRequest
1 голос
/ 14 февраля 2020

Я то, что я сейчас рассматриваю как третью часть выполнения задачи проверки связи с очень большим списком URL-адресов (количество которых исчисляется тысячами) и получения сертификата x509 URL-адреса, связанного с ним. Часть 1 здесь (Как правильно использовать потоки для проверки связи с URL) , а часть 2 здесь (Почему мой пул соединений не реализует мой код потока) .

Поскольку я задал эти два вопроса, у меня теперь появился следующий код:

###### This is the code that pings a url and grabs its x509 cert #####

class SslClient
  attr_reader :url, :port, :timeout

  def initialize(url, port = '443')
    @url = url
    @port = port
  end

  def ping_for_certificate_info
    context = OpenSSL::SSL::SSLContext.new
    tcp_client = TCPSocket.new(url, port)
    ssl_client = OpenSSL::SSL::SSLSocket.new tcp_client, context
    ssl_client.hostname = url
    ssl_client.sync_close = true
    ssl_client.connect
    certificate = ssl_client.peer_cert
    verify_result = ssl_client.verify_result
    tcp_client.close
    {certificate: certificate, verify_result: verify_result }
  rescue => error
    {certificate: nil, verify_result: nil }
  end
end

Приведенный выше код имеет первостепенное значение при получении ssl_client.peer_cert. Ниже приведен следующий код, представляющий собой фрагмент кода, который отправляет несколько HTTP-запросов на URL-адреса своих сертификатов:

  pool = Concurrent::CachedThreadPool.new
  pool.post do
    [LARGE LIST OF URLS TO PING].each do |struct|
       ssl_client = SslClient.new(struct.domain.gsub("*.", "www."), struct.scan_port)
       cert_info = ssl_client.ping_for_certificate_info
       struct.x509_cert = cert_info[:certificate]
       struct.verify_result = cert_info[:verify_result]
     end
   end

   pool.shutdown
   pool.wait_for_termination

   #Do some rails code with the database depending on the results.

Пока я запускаю этот код, он невероятно медленный. Я думал, что при создании пула потоков с потоками код будет go намного быстрее. Это не так, и я не уверен почему. Во многом это потому, что я не знал нюансов потоков, пулов, голодания, блокировок и т. Д. c. Однако после реализации приведенного выше кода я прочитал еще несколько, чтобы попытаться ускорить его, и снова я запутался и мог бы использовать некоторые пояснения относительно того, как я могу сделать код быстрее.

Для начала, в эта превосходная статья здесь (ruby-параллелизм-параллелизм) . Мы получаем следующие определения и понятия:

Параллелизм против параллелизма. Эти термины используются свободно, но имеют разные значения.

Параллелизм : искусство делать много задач, по одному за раз. При быстром переключении между ними пользователю может показаться, что они происходят одновременно. Параллелизм : Выполнение множества задач буквально в одно и то же время. Вместо одновременного появления они являются одновременными. Параллельность чаще всего используется для приложений с интенсивным вводом-выводом. Например, веб-приложение может регулярно взаимодействовать с базой данных или выполнять множество сетевых запросов. Используя параллелизм, мы можем поддерживать отзывчивость нашего приложения, даже пока мы ожидаем ответа базы данных на наш запрос.

Это возможно, поскольку виртуальная машина Ruby позволяет другим потокам работать, пока один ожидает во время ввода-вывода. Даже если программе приходится делать десятки запросов, если мы используем параллелизм, запросы будут выполняться практически в одно и то же время.

Параллелизм, с другой стороны, в настоящее время не поддерживается Ruby.

Итак, из этой части статьи я понимаю, что то, что я хочу сделать, нужно делать одновременно, потому что я пингую URL-адреса в сети и что Параллелизм в настоящее время не поддерживается Ruby.

Следующее - то, где вещи запутываются для меня. Из моего вопроса части 1 о переполнении стека я узнал следующее в комментарии, что мне следует сделать следующее:

Использовать пул потоков; не просто создать тысячу одновременных потоков. Для чего-то вроде подключения к URL, где будет много ожидания, вы можете переподписать количество потоков на ядро ​​ЦП, но не на огромное количество. Вам придется поэкспериментировать.

Другой пользователь говорит так:

Вы бы не породили тысячи потоков, используйте пул соединений (например, * 1052). *), поэтому у вас есть максимум 20-30 одновременных запросов (это максимальное число должно быть определено путем тестирования, в какой момент производительность сети падает, и вы получаете эти тайм-ауты)

Так что для этой части, Я переключился на concurrent-ruby и реализовал как CachedThreadPool, так и FixedThreadPool с 10 потоками. Я выбрал `CachedThreadPool, потому что мне казалось, что количество необходимых потоков позаботится обо мне из Threadpool. Теперь в параллельной документации ruby для пула я вижу это:

pool = Concurrent::CachedThreadPool.new
pool.post do
  # some parallel work
end

Я думал, что мы только что установили в первой статье, что параллелизм не поддерживается в Ruby, так что делает пул потоков? Это работает одновременно или параллельно? Что именно происходит? Нужен ли мне пул потоков или нет? Также на тот момент я думал, что пулы соединений и пулы потоков одинаково используются просто взаимозаменяемо. В чем разница между двумя пулами и какой мне нужен?

В другой замечательной статье Как выполнять параллельные HTTP-запросы в Ruby и Rails , эта статья представляет Concurrent::Promises одновременная форма класса ruby, чтобы избежать блокировок и обеспечить безопасность потоков с двумя вызовами API. Ниже приведен фрагмент кода со следующим описанием:

def get_all_conversations
  groups_thread = Thread.new do
    get_groups_list
  end

  channels_thread = Thread.new do
    get_channels_list
  end

  [groups_thread, channels_thread].map(&:value).flatten
end

Каждый запрос выполняется в своем собственном потоке, который может выполняться параллельно, потому что это блокирующий ввод / вывод. Но видите ли вы здесь уловку?

В приведенном выше коде еще одно упоминание о параллелизме, которое мы только что сказали, не существовало в ruby. Ниже приведен подход с Concurrent::Promise

def get_all_conversations
  groups_promise = Concurrent::Promise.execute do
    get_groups_list
  end

  channels_promise = Concurrent::Promise.execute do
    get_channels_list
  end

  [groups_promise, channels_promise].map(&:value!).flatten
end

Таким образом, согласно этой статье, эти запросы выполняются «параллельно». Мы все еще говорим о параллелизме на этом этапе?

Наконец, в этих двух статьях говорится об использовании Futures для одновременных http-запросов. Я не буду вдаваться в подробности go, но вставлю здесь ссылки.

1. Использование Concurrent Ruby в Ruby в приложении Rails 2. Изучите параллелизм путем реализации Futures в Ruby

Опять же, то, о чем говорилось в статье, выглядит для меня как функциональность Concurrent::Promise. Я просто хочу отметить, что примеры показывают, как использовать концепции для двух разных вызовов API, которые необходимо объединить. Это не то, что мне нужно. Мне просто нужно сделать тысячи вызовов API быстрыми и записать результаты.

В заключение я просто хочу знать, что мне нужно сделать, чтобы мой код стал быстрее и безопаснее с точки зрения потоков, чтобы он работал параллельно. Что именно мне не хватает, чтобы сделать код go более быстрым, потому что сейчас он идет так медленно, что я мог бы с самого начала не использовать потоки.

Резюме

У меня есть пинговать тысячи URL-адресов, используя потоки, чтобы ускорить процесс. Код работает медленно, и я запутался, если правильно использую потоки, пулы потоков и параллелизм.

1 Ответ

0 голосов
/ 14 февраля 2020

Давайте посмотрим на проблемы, которые вы описали, и попробуем решить их по одному:

У вас есть два фрагмента кода, SslClient и скрипт, который использует этот клиент ssl. Из моего понимания пула потоков, способ использования вами пула потоков должен быть немного изменен.

С:

pool = Concurrent::CachedThreadPool.new
pool.post do
 [LARGE LIST OF URLS TO PING].each do |struct|
    ssl_client = SslClient.new(struct.domain.gsub("*.", "www."), struct.scan_port)
    cert_info = ssl_client.ping_for_certificate_info
    struct.x509_cert = cert_info[:certificate]
    struct.verify_result = cert_info[:verify_result]
  end
end

pool.shutdown
pool.wait_for_termination

на:

pool = Concurrent::FixedThreadPool.new(10) 

[LARGE LIST OF URLS TO PING].each do | struct |
  pool.post do 
   ssl_client = SslClient.new(struct.domain.gsub("*.", "www."), struct.scan_port)
   cert_info = ssl_client.ping_for_certificate_info
   struct.x509_cert = cert_info[:certificate]
   struct.verify_result = cert_info[:verify_result]
  end
end

pool.shutdown
pool.wait_form

In В исходной версии в пуле размещена только одна единица работы. Во второй версии мы отправляем в пул столько единиц работы, сколько есть элементов в LARGE LIST OF URLS TO PING.

Чтобы добавить немного больше о параллелизме и параллелизме в Ruby, верно, что Ruby не поддерживает истинный параллелизм из-за GIL (Global Interpreter Lock), но это применимо только тогда, когда мы фактически выполняем какую-либо работу с процессором. В случае сетевого запроса длительность работы, связанной с ЦП, очень мала по сравнению с работой, связанной с вводом-выводом, что означает, что ваш сценарий использования является очень хорошим кандидатом на использование потоков.

Кроме того, используя пул потоков, мы можем минимизировать накладные расходы, связанные с созданием потоков процессором. Когда мы используем пул потоков, как в случае с Concurrent :: FixedThreadPool.new (10), мы буквально ограничиваем количество потоков, доступных в пуле, для несвязанного пула потоков новые потоки создаются каждый раз, когда модуль работы присутствует, но остальные потоки в пуле заняты.

В первой статье необходимо было собрать результат, возвращаемый каждым отдельным работником, а также действовать значимо в случае исключения (я автор). Вы должны быть в состоянии использовать класс, указанный в этом блоге, без каких-либо изменений.

Давайте попробуем переписать ваш код, используя Concurrent :: Future, поскольку и в вашем случае нам нужны результаты.


thread_pool = Concurrent::FixedThreadPool.new(20)

executors = [LARGE LIST OF URLS TO PING].map do | struct |
  Concurrent::Future.execute({ executor: thread_pool }) do
    ssl_client = SslClient.new(struct.domain.gsub("*.", "www."), struct.scan_port)
    cert_info = ssl_client.ping_for_certificate_info
    struct.x509_cert = cert_info[:certificate]
    struct.verify_result = cert_info[:verify_result]
    struct
  end
end

executors.map(&:value)

Надеюсь, это поможет. В случае вопросов, пожалуйста, задавайте в комментариях, я буду изменять эту запись, чтобы ответить на них.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...