Сохранение порядка в ThreadPool: как вставить строку в CSV в указанной позиции индекса c? - PullRequest
0 голосов
/ 01 мая 2020

Я пишу сценарий ruby, который зацикливается на файле CSV, а затем для каждой строки извлекает данные из API стороннего производителя, а затем записывает полученные данные в файл CSV.
Я пытаюсь реализовать thread_pool для обработки вызовов API и вставки строк параллельно. Я не совсем уверен, что то, что я делаю, правильно, поэтому приветствуются любые рекомендации.
Одна конкретная проблема c, на которую я наткнулся, это как сохранить порядок исходного файла .
Мое решение было бы передать индекс первого файла потоку, а затем заставить поток вставить строку в этой позиции индекса в CSV.

Вот класс задач, который я буду sh многопоточным.

class Task
  def initialize(row, index, conn)
    @row = row
    @index = index
    @file = CSV.open("temp_and_cases_parallel.csv", "ab")
    @conn = conn
  end

  def run
    get_climate_data
    writte_climate_data
  end

  private

  def get_climate_data
    uri = "https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/weatherdata/history?&aggregateHours=24&startDateTime=#{@row["day"].strip}T00:00:00&endDateTime=#{@row["day"].strip}T23:59:00&unitGroup=metric&contentType=csv&location=#{@row["lat"].strip},#{@row["long"].strip}&key=#{API_KEY}"
    response = @conn.get uri
    puts("calling #{uri}")
    @climate_info = CSV.parse(response.body, headers: true).first
  end

  def writte_climate_data
    if @index == 1
      headers = @row.headers + @climate_info.headers
      @file << headers
    end
    @file << @row.fields + @climate_info.fields
  end
end

Так что именно в writte_climate_data я хотел бы иметь возможность вставлять в @file в указанной позиции c на основе @index

Вот реализация пула потоков:

class ThreadPool
  def initialize(size: 10)
    @size = size
    @tasks = Queue.new
    @pool = []
  end

  def schedule(*args, &block)
    @tasks << [block, args]
  end

  def start
    Thread.new do
      loop do
        next if @pool.size >= @size
        task, args = @tasks.pop
        thread = Thread.new do
          task.call(*args)
          end_thread(thread)
        end
        @pool << thread
      end
    end
  end

  def inactive?
    @tasks.empty? && @pool.empty?
  end

  def end_thread(thread)
    @pool.delete(thread)
    thread.kill
  end

end

Сценарий, который считывает файл case_by_region.csv и для каждой строки создает поток с заданием:

RETRY_OPTIONS = {
  max: 10,
  interval: 3,
  interval_randomness: 0.5,
  backoff_factor: 2
}

conn = Faraday.new do |f|
  f.request :retry, RETRY_OPTIONS
end

threads = []

thread_pool = ThreadPool.new
thread_pool.start
# CSV.open("temp_and_cases_parallel.csv", "ab") do |temp_and_cases|
  CSV.foreach("cases_by_region.csv", headers: true).first(10).each_with_index do |row, index|
    thread_pool.schedule do
      Task.new(row, index, conn).run
    end
  end
# end

sleep(1) until thread_pool.inactive?

Как бы вы go достигли этого, как я могу сохранить исходную позицию строки из исходного файла в результирующем csv?

1 Ответ

0 голосов
/ 01 мая 2020

Если вам нужен порядок - вы не должны использовать массив. Вы можете попробовать Массив Хэшей.

irb(main):001:0> a = {id: 1, name: "a"}
=> {:id=>1, :name=>"a"}
irb(main):002:0> b = {id: 2, name: "b"}
=> {:id=>2, :name=>"b"}
irb(main):003:0> c = {id: 3, name: "c"}
=> {:id=>3, :name=>"c"}
irb(main):004:0> array = [c, a, b]
=> [{:id=>3, :name=>"c"}, {:id=>1, :name=>"a"}, {:id=>2, :name=>"b"}]
irb(main):006:0> array.sort_by {|h| h[:id] }
=> [{:id=>1, :name=>"a"}, {:id=>2, :name=>"b"}, {:id=>3, :name=>"c"}]

Я создал массив с хэшами, упорядоченными 'c, a, b', а затем вы просто сортируете_с помощью нужного ключа. В этом случае я использовал ключ 'id', чтобы сделать его простым. Вы можете выполнить сортировку по идентификатору на основе строки или любому другому идентификатору, который хранит порядок в порядке, указанном вами sh (некоторое значение уже присутствует в CSV, возможно, отметка времени)

...