Поток и очередь - PullRequest
       25

Поток и очередь

26 голосов
/ 02 июля 2011

Мне интересно знать, как лучше всего реализовать очередь на основе потоков.

Например:

У меня есть 10 действий, которые я хочу выполнить только с 4 потоками.Я хотел бы создать очередь со всеми 10 действиями, расположенными линейно, и начать первые 4 действия с 4 потоками, как только один из потоков будет выполнен, следующий запустится и т. Д. - Таким образом, за раз количество потоковлибо 4, либо меньше 4.

Ответы [ 6 ]

30 голосов
/ 02 июля 2011

Существует класс Queue в thread в стандартной библиотеке.Используя это, вы можете сделать что-то вроде этого:

require 'thread'

queue = Queue.new
threads = []

# add work to the queue
queue << work_unit

4.times do
  threads << Thread.new do
    # loop until there are no more things to do
    until queue.empty?
      # pop with the non-blocking flag set, this raises
      # an exception if the queue is empty, in which case
      # work_unit will be set to nil
      work_unit = queue.pop(true) rescue nil
      if work_unit
        # do work
      end
    end
    # when there is no more work, the thread will stop
  end
end

# wait until all threads have completed processing
threads.each { |t| t.join }

Причина, по которой я всплываю с неблокирующим флагом, заключается в том, что между until queue.empty? и всплывающим элементом другой поток мог выдвинуть очередь, поэтому, если толькоустановлен флаг неблокирования, мы могли бы застрять в этой строке навсегда.

Если вы используете MRI, интерпретатор Ruby по умолчанию, имейте в виду, что потоки не будут абсолютно параллельными.Если ваша работа связана с процессором, вы можете запустить однопоточный.Если у вас есть какая-то операция, которая блокирует IO, вы можете получить некоторый параллелизм, но YMMV.Кроме того, вы можете использовать интерпретатор, который обеспечивает полный параллелизм, такой как jRuby или Rubinius.

7 голосов
/ 12 августа 2011

There площадь несколько драгоценных камней, которые реализуют этот шаблон для вас; параллельно, персик, и моя называется threach (или jruby_threach под JRuby). Это заменяющая замена для #each, но она позволяет вам указать, сколько потоков следует запускать, используя SizedQueue, чтобы не допустить выхода из-под контроля.

Итак ...

(1..10).threach(4) {|i| do_my_work(i) }

Не толкая свой собственный материал; Есть много хороших реализации там, чтобы сделать вещи проще.

Если вы используете JRuby, jruby_threach - гораздо лучшая реализация - Java просто предлагает гораздо более богатый набор используемых потоковых примитивов и структур данных.

5 голосов
/ 20 декабря 2012

Описательный пример исполняемого файла:

require 'thread'

p tasks = [
    {:file => 'task1'},
    {:file => 'task2'},
    {:file => 'task3'},
    {:file => 'task4'},
    {:file => 'task5'}
]

tasks_queue = Queue.new
tasks.each {|task| tasks_queue << task}

# run workers
workers_count = 3
workers = []
workers_count.times do |n|
    workers << Thread.new(n+1) do |my_n|
        while (task = tasks_queue.shift(true) rescue nil) do
            delay = rand(0)
            sleep delay
            task[:result] = "done by worker ##{my_n} (in #{delay})"
            p task
        end
    end
end

# wait for all threads
workers.each(&:join)

# output results
puts "all done"
p tasks
4 голосов
/ 02 июля 2011

Вы можете использовать пул потоков.Это довольно распространенная модель для такого типа проблем.
http://en.wikipedia.org/wiki/Thread_pool_pattern

Github, кажется, имеет несколько реализаций, которые вы можете попробовать:
https://github.com/search?type=Everything&language=Ruby&q=thread+pool

1 голос
/ 05 июля 2016

Я использую камень под названием work_queue .Это действительно практично.

Пример:

require 'work_queue'
wq = WorkQueue.new 4, 10
(1..10).each do |number|
    wq.enqueue_b("Thread#{number}") do |thread_name|  
        puts "Hello from the #{thread_name}"
    end
end
wq.join
1 голос
/ 03 октября 2014

Целлулоид имеет пример рабочего пула , который делает это.

...