Эликсир: как сохранить состояние потока через несколько каналов? - PullRequest
0 голосов
/ 09 апреля 2020

Моя цель : я хочу прочитать первую строку файла, реструктурировать входные данные в конвейере, а затем обработать оставшуюся часть файла в другом конвейере.

Моя проблема : поток сбрасывается при каждом новом конвейере.

Пример кода :

defmodule StrangeStream do
  fs = File.stream!("file.txt")

  Stream.take(fs, 1) |> Enum.to_list() |> IO.inspect()

  Stream.take(fs, 1) |> Enum.to_list() |> IO.inspect()
end

Текстовый файл file.txt :

First line.
Second line.
Third line.

Вывод :

["First line.\n"]
["First line.\n"]

Как видите, поток сбрасывается в каждом конвейере. Каждый конвейер начинается с первой строки в файле. Как мне поддерживать состояние потока между вызовами в конвейер? Заранее спасибо!

Ответы [ 2 ]

0 голосов
/ 10 апреля 2020

Вот как я получил эффект, который хотел. Надеюсь, это поможет другим, кто в этом разбирается.

Еще раз большое спасибо Алексею за то, что он сэкономил мне столько времени.

defmodule StrangeStream do
  do_stuff = fn(something) ->
    # We'd do something useful here
    something
  end

  {:ok, file} = File.open("file.txt", [:read, :line])

  # Read the first line
  first_line = IO.read(file, :line)
  |>  String.trim()
  |>  do_stuff.()
  |>  IO.inspect([label: "first_line"])

  # Create side-effect streams
  print_stream = IO.binstream(:stdio, :line)
  file_stream  = File.stream!("output.txt", [:write, :append])

  # Convert IO to Stream and process
  IO.stream(file, :line)
  |>  Stream.map(&String.trim(&1))
  |>  do_stuff.()
  |>  Stream.into(print_stream, fn(s)-> s <>"\n" end)
  |>  do_stuff.()
  |>  Stream.into(file_stream)
  |>  do_stuff.()
  |>  Enum.to_list()
  |>  IO.inspect([label: "rest of file"])
end

Вывод

first_line: "First line."
Second line.
Third line.
rest of file: ["Second line.", "Third line."]
0 голосов
/ 09 апреля 2020

TL; DR: вы не можете.


В нет изменяемого состояния, поэтому невозможно поддерживать состояние resource.

Единственное подобное: приостановить перечисляемое значение при сокращении, но даже это невозможно напрямую с потоками.

Вы можете прибегнуть к Stream.transform/4 и поддерживать состояние самостоятельно, выбирая соответствующий конвейер.

Sidenote: Enum.to_list/1 уже завершает поток, поэтому подход в вопросе не будет работать вообще.

...