У меня есть 2 миллиона файлов данных для обработки.
Выполнение следующего кода занимает 2 часа.
out_file = "./output.tsv"
result = "./input.tsv"
|> File.stream!
|> CSV.decode(separator: ?\t, headers: headers)
|> Enum.map(&(elem(&1, 1)))
|> Enum.group_by(&{&1.id, &1.name})
|> Enum.map(&(format_data(&1)))
File.write(out_file, result)
Чтобы продолжить работу с высокой производительностью, я взял Flow
, затем написал следующий код.
Это похоже на Enum
, поэтому я просто добавил Flow.from_enumerable
, а затем переписал Enum
в Flow
.
out_file = "./output.tsv"
result = "./input.tsv"
|> File.stream!
|> CSV.decode(separator: ?\t, headers: headers)
|> Flow.from_enumerable(stages: 4)
|> Flow.map(&(elem(&1, 1)))
|> Flow.group_by(&{&1.id, &1.name})
|> Flow.map(&(format_data(&1)))
File.write(out_file, result)
Это не работает. Я не думаю, что правильно использовать Flow
, как это.
Пожалуйста, дайте мне знать ваш совет для правильного использования Flow
.