Как обработать элементы в потоках, выбрасывая исключения в ruby? - PullRequest
0 голосов
/ 11 ноября 2018

Я обрабатываю элементы с потоками и очередями, но иногда выдается исключение для определенного элемента, поэтому он не обрабатывается.

Моя идея состоит в том, чтобы поместить нарушающий элемент обратно в очередь, чтобы его можно было снова обработать, и продолжайте помещать его обратно до тех пор, пока исключение больше не будет выброшено.

Несмотря на мое намерение повторно обрабатывать элементы, этот код обрабатывает очередь только один раз:

#assume this queue is immediately filled with items
item_queue = Queue.new 

define_method (:processItem) {|item|
  begin
    #do something with item
  #Bad style below: will work in specific exception handling later
  rescue Exception => ex 
    #something happened, so put it back in the queue
    item_queue << item
    return
  end
  #more processing here, if 'begin' was successful
}

threads = []    

until item_queue.empty?
  threads << Thread.new{ processItem(item_queue.pop) }
end

threads.each{|thread| thread.join}

Я думал, что Queue поточно-ориентирован, поэтому его можно использовать вот так - но результаты показывают иначе.

Как я могу обеспечить повторную обработку всех элементов, создающих исключение, до тех пор, пока все элементы не будут успешными?

1 Ответ

0 голосов
/ 11 ноября 2018

Да Queue - потокобезопасный, но то, как вы его используете, небезопасно.

item_queue.empty? может вернуть true до окончания резьбы.

Вызов Thread.join внутри until item_queue.empty? решит проблему состояния гонки, но в итоге получит программу, которая обрабатывает один элемент за раз из очереди.

until item_queue.empty?
  Thread.new{ processItem(item_queue.pop) }.join
end

Если вы хотите, чтобы элементы в очереди обрабатывались в многопоточном режиме, вам нужно заранее указать, сколько потоков вы хотите, например:

# three threads processing items in the queue
until item_queue.empty?
  t1 = Thread.new{ processItem(item_queue.pop) }
  t2 = Thread.new{ processItem(item_queue.pop) }
  t3 = Thread.new{ processItem(item_queue.pop) }
  t1.join 1
  t2.join 1
  t3.join 1
end
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...