Конвертировать Enum в Flow с помощью эликсира - PullRequest
1 голос
/ 12 июня 2019

У меня есть 2 миллиона файлов данных для обработки. Выполнение следующего кода занимает 2 часа.

out_file = "./output.tsv"
result = "./input.tsv"
         |> File.stream!
         |> CSV.decode(separator: ?\t, headers: headers)
         |> Enum.map(&(elem(&1, 1)))
         |> Enum.group_by(&{&1.id, &1.name})
         |> Enum.map(&(format_data(&1)))
File.write(out_file, result)

Чтобы продолжить работу с высокой производительностью, я взял Flow, затем написал следующий код. Это похоже на Enum, поэтому я просто добавил Flow.from_enumerable, а затем переписал Enum в Flow.

out_file = "./output.tsv"
result = "./input.tsv"
         |> File.stream!
         |> CSV.decode(separator: ?\t, headers: headers)
         |> Flow.from_enumerable(stages: 4)
         |> Flow.map(&(elem(&1, 1)))
         |> Flow.group_by(&{&1.id, &1.name})
         |> Flow.map(&(format_data(&1)))
File.write(out_file, result)

Это не работает. Я не думаю, что правильно использовать Flow, как это. Пожалуйста, дайте мне знать ваш совет для правильного использования Flow.

1 Ответ

1 голос
/ 12 июня 2019

Самый верхний пример на главной странице документации Flow показывает, что вам нужно завершить Flow, а также Stream с чем-то вроде Enum.to_list().

В вашем первом фрагменте завершение происходит при первом вызове Enum.map/2 (Stream.map/2 и семейство должно использоваться для потоковой обработки, File.stream!/1 действует так же, как Fire.read/1, потому что вы немедленно завершаете его.)

Кроме того, NimbleCSV был явно создан основной командой Elixir для потоковой обработки CSV. Во всяком случае, ниже, вероятно, будет работать:

result =
  "./input.tsv"
  |> File.stream!
  # here the stream is terminated
  |> CSV.decode(separator: ?\t, headers: headers)
  |> Flow.from_enumerable(stages: 4)
  |> Flow.map(&(elem(&1, 1)))
  |> Flow.group_by(&{&1.id, &1.name})
  |> Flow.map(&(format_data(&1)))
  # ⇓ THIS IS IMPORTANT
  |> Enum.to_list()

Лучшим подходом будет:

result =
  "./input.tsv"
  |> File.stream!(read_ahead: 100_000)
  |> NimbleCSV.RFC4180.parse_stream()
  |> Flow.from_enumerable(stages: 4)
  |> Flow.map(&(elem(&1, 1)))
  |> Flow.group_by(&{&1.id, &1.name})
  |> Flow.map(&(format_data(&1)))
  # ⇓ THIS IS IMPORTANT
  |> Enum.to_list()
...