Избавление от нити в рубине или юри - PullRequest
3 голосов
/ 10 января 2012

У меня есть подписчик очереди rabbitmq, который раскручивает новый поток каждый раз, когда потребляется новое сообщение:

AMQP.start(@conf) do |connection|
  channel = AMQP::Channel.new(connection)

  requests_queue = channel.queue("one")

  requests_queue.subscribe(:ack => true) do |header, body|
    puts "we have a message at #{Time.now} and n is #{n}"

    url_search = MultiJson.decode(body)

    Thread.new do
      5.times do
        lead = get_lead(n, (n == 5))

        puts "message #{n} is_last = #{lead.is_last} at #{Time.now}";

        AMQP::Exchange.default.publish(
                                        MultiJson.encode(lead), 
                                        :routing_key => header.reply_to,
                                        :correlation_id => header.correlation_id
                                      )

        n += 1
        sleep(2)
      end
    end
  end
end

У меня вопрос: как избавиться от темы после обработки сообщения? Должен ли я использовать пул потоков?

Я использую JRuby. Создает ли приведенный выше код поток Java JVM за кулисами, используя обычный синтаксис ruby, или я должен явно создавать поток Java?

1 Ответ

1 голос
/ 10 января 2012

Вам не нужно вручную удалять поток, я думаю, и вы должны использовать потоки ruby, исходя из того, что я понял, что они являются потоками java в jruby, а именно из того, что jruby получает, это хорошая производительность.

Обычная вещь, которую нужно сделать, - это раскрутить пару потоков, а затем присоединиться ко всем, прежде чем продолжить, если вы хотите быть уверены, что все завершены до следующего шага, но здесь это не требуется.

Вот небольшая тестовая программа:

# foo.rb
a = Thread.new { print "a"; sleep(1); print "b"; print "c" }
require 'pp'
pp Thread.list
puts "foo"
sleep(2); 
pp Thread.list
puts "bar"

Как видите, порожденная фоновая нить автоматически удаляется. (Проверено в jruby, а также 1.9.2

$ ruby foo.rb 
a[#<Thread:0x00000100887678 run>, #<Thread:0x0000010086c7d8 sleep>]
foo
bc[#<Thread:0x00000100887678 run>]
bar
...