Правильно ждем окончания процесса - PullRequest
0 голосов
/ 24 августа 2018

В настоящее время у меня есть что-то вроде этого:

ref = Process.monitor(worker)

receive do
  {:DOWN, ^ref, :process, ^worker, :normal} ->
    IO.puts("Normal exit from #{inspect(worker)}")

  {:DOWN, ^ref, :process, ^worker, msg} ->
    IO.puts("Received :DOWN from #{inspect(worker)}")
end

У этого работника есть start_link, который отправляет поток данных потребителю, все это происходит внутри задачи микширования, поэтому, если я не добавлю команду do do, как только миксы умирают, это также убивает дочерний процесс, как будто я запускаю его из оболочки iex -S mix.

flow
... some producer consumer in the middle
|> Flow.into_specs(consumer)

Дело в том, что когда поток пуст (является конечным потоком), работник, кажется, не умирает, прием не запускается, есть ли другой способ добиться этого?

Edit:

Тестирование с использованием фиктивного процесса, который мне даже не нужен receive do

defmodule Mix.Tasks.Stuff do


  def run([]) do
    {:ok, worker} = Worker.start_link([])
  end
end

defmodule Worker do

  def start_link([]) do
    Enum.map([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], fn number ->
      IO.puts "COUNTING"
      Process.sleep(1000)
      IO.puts "-----------"
    end)
  end
end

Но когда рабочий запускает Поток, если я не жду с receive do, он мгновенно умирает, как только я выполняю задачу, так что, возможно, в этом проблема.

1 Ответ

0 голосов
/ 24 августа 2018

Согласно документации на Process.monitor/1:

Если при вызове Process.monitor/1 процесс уже мертв, сообщение :DOWN доставляется немедленно.

То есть, когда Flow пуст, receive do устанавливает обработчики для :DOWN сообщения после его доставки. Для обработки случая следует поменять местами вызовы и подготовить обработчики сообщений до того, как начнет отслеживать процесс.

receive do
  {:DOWN, ref, :process, ^worker, msg} ->
    case [retrieve_ref(), msg] do
      [^ref, :normal] ->
        IO.puts("Normal exit from #{inspect(worker)}")
      [^ref, msg] ->
        IO.puts("Received :DOWN from #{inspect(worker)}")
    end
end

ref = Process.monitor(worker)
store_ref_somewhere_ets_or_agent_or_whatever(ref)

Я предполагаю, что store_ref_somewhere_ets_or_agent_or_whatever/1 и соответствующий retrieve_ref/0 будет легко реализовать.

...