Потоковая передача: VM останавливается для большого файла - PullRequest
0 голосов
/ 13 апреля 2019

Я строю конвейер для обработки, агрегирования и преобразования данных из CSV-файлов, а затем записываю их обратно в другой CSV-файл… Я загружаю строки из CSV-файла из 19 столбцов и с помощью некоторых математических операций (стиль уменьшения карты) выполняет обратную запись 30 столбцы в другом csv.

И все шло нормально, пока я не попытался загрузить в приложение файл 25 Мб, 250000 строк, а затем я решил выполнять потоковую передачу всех операций вместо того, чтобы с нетерпением обрабатывать ... но теперь, когда я меняю функцию за функцией с помощью потоков, Я столкнулся с проблемой, заключающейся в том, что я не понимаю, почему после создания всего 5 полей, когда я пытаюсь записать в файл, программа просто зависает и останавливает запись после нескольких тысяч строк. Frozen writing process after thousands of lines been written Я транслирую каждую функцию, поэтому она не должна иметь никаких блокировок, насколько я понимаю, и для первых тысяч записей она работает нормально, поэтому мне интересно, что происходит, в обозревателе erlang я вижу только использование ресурсов, сброшенных до 0 и больше не пишет в файл.

Это моя функция потока (прежде чем я просто загружаю из файла), а затем моя функция записи:

def process(stream, field_longs_lats, team_settings) do
    main_stream =
      stream
      # Removing once that don't have timestamp
      |> Stream.filter(fn [time | _tl] -> time != "-" end)
      # Filter all duplicated rows by timestamp
      |> Stream.uniq_by(fn [time | _tl] -> time end)
      |> Stream.map(&Transform.apply_row_tranformations/1)

    cumulative_milli =
      main_stream
      |> Stream.map(fn [_time, milli | _tl] -> milli end)
      |> Statistics.cumulative_sum()

    speeds =
      main_stream
      |> Stream.map(fn [_time, _milli, _lat, _long, pace | _tl] ->
        pace
      end)
      |> Stream.map(&Statistics.get_speed/1)

    cals = Motion.calories_per_timestep(cumulative_milli, cumulative_milli)

    long_stream =
      main_stream
      |> Stream.map(fn [_time, _milli, lat | _tl] -> lat end)

    lat_stream =
      main_stream
      |> Stream.map(fn [_time, _milli, _lat, long | _tl] -> long end)

    x_y_tuples =
      RelativeCoordinates.relative_coordinates(long_stream, lat_stream, field_longs_lats)

    x = Stream.map(x_y_tuples, fn {x, _y} -> x end)
    y = Stream.map(x_y_tuples, fn {_x, y} -> y end)

    [x, y, cals, long_stream, lat_stream]
  end

запись:

def write_to_file(keyword_list, file_name) do
    file = File.open!(file_name, [:write, :utf8])

    IO.write(file, V4.empty_v4_headers() <> "\n")

    keyword_list
    |> Stream.zip()
    |> Stream.each(&write_tuple_row(&1, file))
    |> Stream.run()

    File.close(file)
  end

@spec write_tuple_row(tuple(), pid()) :: :ok
  def write_tuple_row(tuple, file) do
    IO.inspect("writing #{inspect(tuple)}")

    row_content =
      Tuple.to_list(tuple)
      |> Enum.map_join(",", fn value -> Transformations.to_string(value) end)

    IO.write(file, row_content <> "\n")
  end
...