Как убедиться, что функция не может быть вызвана дважды? - PullRequest
0 голосов
/ 01 декабря 2018

С помощью веб-приложения Phoenix у меня есть контроллер, который вызывает функцию для запуска некоторой обработки.Эта обработка может занять значительное время.

В настоящее время, если я дважды вызываю контроллер, этот процесс завершается и перезапускается (нежелательно).

Как я могу гарантировать, что последующие вызовы кфункция не убивает или не перезапускает процесс и просто игнорируется?

Похоже, поиск в Google предлагает использовать агента, но я не смог заставить его работать.Хотя я могу установить состояние флага, чтобы предотвратить его повторный запуск, процесс все еще умирает при повторном вызове контроллера.

Демонстрация проблемы: https://github.com/corynorris/singleprocess

process failing

Кнопка слева работает, как и ожидалось, и использует javascript только для отправки в конечную точку /start.

Кнопка справа перезапускаетобрабатывать каждый раз, когда его нажимают.

Обновление: Запрос POST отменяет предыдущий запрос и завершает длительный процесс.Запрос GET просто не мог этого сделать, потому что он блокировал любые последующие запросы.Решение здесь состоит в том, чтобы запустить процесс отдельно от основного потока (асинхронно), чтобы он не умирал при отмене почтового запроса).

Для этого я изменил код для использованияGenServer.Я не уверен, что это правильный подход, но у меня есть рабочее решение с приведенными ниже изменениями.

1) Я изменил my_process.ex для сохранения состояния процесса и асинхронного запуска рабочей функции (через Task.async)

  def handle_call(:start, _from, process_map) do
    case Map.get(process_map, :process_running) do
      true ->
        {:reply, process_map, process_map}

      _other ->
        Task.async(&do_work/0)
        updated_process_map = Map.put(process_map, :process_running, true)
        {:reply, updated_process_map, updated_process_map}
    end
  end

2) Я реализовал handle_info для обновления статуса после завершения Task.async:

def handle_info(_, process_map) do
    updated_process_map = Map.put(process_map, :process_running, false)
    {:noreply, updated_process_map}
  end

Он передает статус по каналамAPI:

    SingleProcessWeb.Endpoint.broadcast!("room:notification", "new_msg", %{
      uid: 1,
      body: status
    })

3) Я обновил application.ex, чтобы запустить процесс один раз с помощью:

children = [
  SingleProcessWeb.Endpoint,
  worker(SingleProcess.MyProcess, [[name: :my_process]])
]

Я не уверен, что это лучший подход, но этоработает, поэтому мои следующие шаги будут состоять в том, чтобы изменить его, чтобы он был более общим и абстрагированным от реализации процесса.

1 Ответ

0 голосов
/ 01 декабря 2018

Как я могу обеспечить, чтобы последующие вызовы функции не убивали или не перезапускали процесс и просто игнорировались?

Как насчет чего-то вроде:

defmodule MyProcess do
  use Agent

  def start_link(_args) do
    Agent.start_link(fn -> %{} end, name: :flags)
  end

  def start() do
    #spawn() long running process here
  end

  def running? do
    Agent.get(
      :flags,
      fn(map) -> Map.get(map, :process_running) end
    )
  end

  def set_running_flag do
    Agent.update(
      :flags, 
      fn(map) -> Map.put(map, :process_running, true) end
    )
  end

end 

Затем в вашем действии:

  def your_action(conn, _params) do

    if MyProcess.running?() do
      render(this)
    else         
      MyProcess.start()
      MyProcess.set_running_flag()
      render(that)
    end

  end

Хотя я могу установить состояние флага, чтобы предотвратить его повторный запуск, процесс все еще умирает, когда контроллер вызывается во второй раз.

Не связываться с процессом.

Эта обработка может занять значительное время

Да, но вам нужно?чтобы получить ответ от процесса, или процесс может быть запущен без дальнейшего контакта?

Ответ на комментарий :

Вот что я сделал:

(Я пересмотрел это еще немного, чтобы агент никогда не был убит, и чтобы позволить новому счетчику запуститься, когда закончится любой предыдущий счетчик. Однако, если счетчик вещает, то никакие запросы не могут запускать другой.counter.)

lib / hello / counter.ex

defmodule Hello.Counter do

  def start(count) do
    set_counting_flag(true)
    spawn(__MODULE__, :publish_count, [count])
  end

  def publish_count(0) do
    set_counting_flag(false)
  end
  def publish_count(count) do
    Process.sleep 1_000
    HelloWeb.CountChannel.broadcast_count(count)
    publish_count(count-1)
  end

  def is_counting? do
    Agent.get(:my_agent, 
      fn map -> Map.get(map, :counter_running) end
    )
  end

  def set_counting_flag(bool) do
    Agent.update(:my_agent, 
      fn map ->
        Map.update(map, 
                   :counter_running, 
                   bool, 
                   fn _ -> bool end
        )
      end 
    )
  end

end

lib / hello / my_agent.ex:

defmodule Hello.MyAgent do
  use Agent

  def start_link(_args) do
    Agent.start_link(fn -> %{} end, name: :my_agent)
  end

end

lib / hello_web / channel / count_channel.ex:

defmodule HelloWeb.CountChannel do
  use Phoenix.Channel

  #auth
  def join("count:lobby", _msg, socket) do
    {:ok, socket}
  end
  def join("count:" <> _other, _params, _socket) do
    {:error, %{reason: "unauthorized"}}
  end

  def handle_in("new_msg", %{"body" => body}, socket) do
    broadcast!(socket, "new_msg", %{body: body})
    {:noreply, socket}
  end

  #You can use a Phoenix function to broadcast directly to an Endpoint:
  def broadcast_count(n) do
    HelloWeb.Endpoint.broadcast!("count:lobby", "new_msg", %{body: "#{n}"})
  end

end

lib / hello_web / channel /user_socket.ex:

defmodule HelloWeb.UserSocket do
  use Phoenix.Socket

  ## Channels
  channel "count:*", HelloWeb.CountChannel

  # Socket params are passed from the client and can
  # be used to verify and authenticate a user. After
  # verification, you can put default assigns into
  # the socket that will be set for all channels, ie
  #
  #     {:ok, assign(socket, :user_id, verified_user_id)}
  #
  # To deny connection, return `:error`.
  #
  # See `Phoenix.Token` documentation for examples in
  # performing token verification on connect.
  def connect(_params, socket, _connect_info) do
    {:ok, socket}
  end

  # Socket id's are topics that allow you to identify all sockets for a given user:
  #
  #     def id(socket), do: "user_socket:#{socket.assigns.user_id}"
  #
  # Would allow you to broadcast a "disconnect" event and terminate
  # all active sockets and channels for a given user:
  #
  #     HelloWeb.Endpoint.broadcast("user_socket:#{user.id}", "disconnect", %{})
  #
  # Returning `nil` makes this socket anonymous.
  def id(_socket), do: nil
end

lib / hello_web / router.ex:

  ...
  ...
  scope "/", HelloWeb do
    pipe_through :browser

    get "/", PageController, :index
    get "/count/:count", PageController, :counter

  end

  # Other scopes may use custom stacks.
  # scope "/api", HelloWeb do
  #   pipe_through :api
  # end
end

lib / hello_web / controllers / page_controller.ex:

defmodule HelloWeb.PageController do
  use HelloWeb, :controller

  def index(conn, _params) do
    render(conn, "index.html")
  end

  def counter(conn, %{"count" => count}) do
    if ! Hello.Counter.is_counting? do
      {int_part, _rest} = Integer.parse(count)
      Hello.Counter.start(int_part)
    end

    render(conn, "index.html")
  end
end

привет/assets/js/socket.js:

// NOTE: The contents of this file will only be executed if
// you uncomment its entry in "assets/js/app.js".

// To use Phoenix channels, the first step is to import Socket,
// and connect at the socket path in "lib/web/endpoint.ex".
//
// Pass the token on params as below. Or remove it
// from the params if you are not using authentication.
import {Socket} from "phoenix"

let socket = new Socket("/socket", {params: {token: window.userToken}})

// When you connect, you'll often need to authenticate the client.
// For example, imagine you have an authentication plug, `MyAuth`,
// which authenticates the session and assigns a `:current_user`.
// If the current user exists you can assign the user's token in
// the connection for use in the layout.
//
// In your "lib/web/router.ex":
//
//     pipeline :browser do
//       ...
//       plug MyAuth
//       plug :put_user_token
//     end
//
//     defp put_user_token(conn, _) do
//       if current_user = conn.assigns[:current_user] do
//         token = Phoenix.Token.sign(conn, "user socket", current_user.id)
//         assign(conn, :user_token, token)
//       else
//         conn
//       end
//     end
//
// Now you need to pass this token to JavaScript. You can do so
// inside a script tag in "lib/web/templates/layout/app.html.eex":
//
//     <script>window.userToken = "<%= assigns[:user_token] %>";</script>
//
// You will need to verify the user token in the "connect/3" function
// in "lib/web/channels/user_socket.ex":
//
//     def connect(%{"token" => token}, socket, _connect_info) do
//       # max_age: 1209600 is equivalent to two weeks in seconds
//       case Phoenix.Token.verify(socket, "user socket", token, max_age: 1209600) do
//         {:ok, user_id} ->
//           {:ok, assign(socket, :user, user_id)}
//         {:error, reason} ->
//           :error
//       end
//     end
//
// Finally, connect to the socket:
socket.connect()

// Now that you are connected, you can join channels with a topic:
let channel = socket.channel("count:lobby", {})

channel.join()
  .receive("ok", resp => { console.log("Joined successfully", resp) })
  .receive("error", resp => { console.log("Unable to join", resp) })

let text_input_box = document.querySelector("#msg_to_send")
let msg_div = document.querySelector("#received_messages")

text_input_box.addEventListener("keypress", event => {
  let return_key = 13

  if (event.keyCode == return_key) {
    channel.push("new_msg", {body: text_input_box.value})
    text_input_box.value = ""
  }

})

channel.on("new_msg", payload => {
  let new_msg_div = document.createElement('div')
  new_msg_div.innerText = `[${Date()}]: ${payload.body}`
  msg_div.appendChild(new_msg_div)
})

export default socket

lib / hello_web / templates / page / index.html

<div id="received_messages"></div>
<input id="msg_to_send" type="text"></input>

lib / hello / application.ex:

defmodule Hello.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application

  def start(_type, _args) do
    # List all child processes to be supervised
    children = [
      # Start the Ecto repository
      Hello.Repo,

      # Start the endpoint when the application starts
      HelloWeb.Endpoint,

      # Starts a worker by calling: Hello.Worker.start_link(arg)
      # {Hello.Worker, arg},

      Hello.MyAgent  #calls Hello.MyAgent.start_link([])
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: Hello.Supervisor]
    Supervisor.start_link(children, opts)
  end

  # Tell Phoenix to update the endpoint configuration
  # whenever the application is updated.
  def config_change(changed, _new, removed) do
    HelloWeb.Endpoint.config_change(changed, removed)
    :ok
  end
end

enter image description here

...