Я строю конвейер для обработки, агрегирования и преобразования данных из CSV-файлов, а затем записываю их обратно в другой CSV-файл… Я загружаю строки из CSV-файла из 19 столбцов и с помощью некоторых математических операций (стиль уменьшения карты) выполняет обратную запись 30 столбцы в другом csv.
И все шло нормально, пока я не попытался загрузить в приложение файл 25 Мб, 250000 строк, а затем я решил выполнять потоковую передачу всех операций вместо того, чтобы с нетерпением обрабатывать ... но теперь, когда я меняю функцию за функцией с помощью потоков, Я столкнулся с проблемой, заключающейся в том, что я не понимаю, почему после создания всего 5 полей, когда я пытаюсь записать в файл, программа просто зависает и останавливает запись после нескольких тысяч строк.
Я транслирую каждую функцию, поэтому она не должна иметь никаких блокировок, насколько я понимаю, и для первых тысяч записей она работает нормально, поэтому мне интересно, что происходит, в обозревателе erlang я вижу только использование ресурсов, сброшенных до 0 и больше не пишет в файл.
Это моя функция потока (прежде чем я просто загружаю из файла), а затем моя функция записи:
def process(stream, field_longs_lats, team_settings) do
main_stream =
stream
# Removing once that don't have timestamp
|> Stream.filter(fn [time | _tl] -> time != "-" end)
# Filter all duplicated rows by timestamp
|> Stream.uniq_by(fn [time | _tl] -> time end)
|> Stream.map(&Transform.apply_row_tranformations/1)
cumulative_milli =
main_stream
|> Stream.map(fn [_time, milli | _tl] -> milli end)
|> Statistics.cumulative_sum()
speeds =
main_stream
|> Stream.map(fn [_time, _milli, _lat, _long, pace | _tl] ->
pace
end)
|> Stream.map(&Statistics.get_speed/1)
cals = Motion.calories_per_timestep(cumulative_milli, cumulative_milli)
long_stream =
main_stream
|> Stream.map(fn [_time, _milli, lat | _tl] -> lat end)
lat_stream =
main_stream
|> Stream.map(fn [_time, _milli, _lat, long | _tl] -> long end)
x_y_tuples =
RelativeCoordinates.relative_coordinates(long_stream, lat_stream, field_longs_lats)
x = Stream.map(x_y_tuples, fn {x, _y} -> x end)
y = Stream.map(x_y_tuples, fn {_x, y} -> y end)
[x, y, cals, long_stream, lat_stream]
end
запись:
def write_to_file(keyword_list, file_name) do
file = File.open!(file_name, [:write, :utf8])
IO.write(file, V4.empty_v4_headers() <> "\n")
keyword_list
|> Stream.zip()
|> Stream.each(&write_tuple_row(&1, file))
|> Stream.run()
File.close(file)
end
@spec write_tuple_row(tuple(), pid()) :: :ok
def write_tuple_row(tuple, file) do
IO.inspect("writing #{inspect(tuple)}")
row_content =
Tuple.to_list(tuple)
|> Enum.map_join(",", fn value -> Transformations.to_string(value) end)
IO.write(file, row_content <> "\n")
end