Я пишу сценарий ruby, который зацикливается на файле CSV, а затем для каждой строки извлекает данные из API стороннего производителя, а затем записывает полученные данные в файл CSV.
Я пытаюсь реализовать thread_pool для обработки вызовов API и вставки строк параллельно. Я не совсем уверен, что то, что я делаю, правильно, поэтому приветствуются любые рекомендации.
Одна конкретная проблема c, на которую я наткнулся, это как сохранить порядок исходного файла .
Мое решение было бы передать индекс первого файла потоку, а затем заставить поток вставить строку в этой позиции индекса в CSV.
Вот класс задач, который я буду sh многопоточным.
class Task
def initialize(row, index, conn)
@row = row
@index = index
@file = CSV.open("temp_and_cases_parallel.csv", "ab")
@conn = conn
end
def run
get_climate_data
writte_climate_data
end
private
def get_climate_data
uri = "https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/weatherdata/history?&aggregateHours=24&startDateTime=#{@row["day"].strip}T00:00:00&endDateTime=#{@row["day"].strip}T23:59:00&unitGroup=metric&contentType=csv&location=#{@row["lat"].strip},#{@row["long"].strip}&key=#{API_KEY}"
response = @conn.get uri
puts("calling #{uri}")
@climate_info = CSV.parse(response.body, headers: true).first
end
def writte_climate_data
if @index == 1
headers = @row.headers + @climate_info.headers
@file << headers
end
@file << @row.fields + @climate_info.fields
end
end
Так что именно в writte_climate_data
я хотел бы иметь возможность вставлять в @file
в указанной позиции c на основе @index
Вот реализация пула потоков:
class ThreadPool
def initialize(size: 10)
@size = size
@tasks = Queue.new
@pool = []
end
def schedule(*args, &block)
@tasks << [block, args]
end
def start
Thread.new do
loop do
next if @pool.size >= @size
task, args = @tasks.pop
thread = Thread.new do
task.call(*args)
end_thread(thread)
end
@pool << thread
end
end
end
def inactive?
@tasks.empty? && @pool.empty?
end
def end_thread(thread)
@pool.delete(thread)
thread.kill
end
end
Сценарий, который считывает файл case_by_region.csv и для каждой строки создает поток с заданием:
RETRY_OPTIONS = {
max: 10,
interval: 3,
interval_randomness: 0.5,
backoff_factor: 2
}
conn = Faraday.new do |f|
f.request :retry, RETRY_OPTIONS
end
threads = []
thread_pool = ThreadPool.new
thread_pool.start
# CSV.open("temp_and_cases_parallel.csv", "ab") do |temp_and_cases|
CSV.foreach("cases_by_region.csv", headers: true).first(10).each_with_index do |row, index|
thread_pool.schedule do
Task.new(row, index, conn).run
end
end
# end
sleep(1) until thread_pool.inactive?
Как бы вы go достигли этого, как я могу сохранить исходную позицию строки из исходного файла в результирующем csv?