Как создать пакетный процесс в запросах эликсир феникс - PullRequest
0 голосов
/ 29 сентября 2018

Я должен создать API с отличной производительностью, и я хочу создать его с помощью Elixir. У меня есть процесс (медленный), который я должен запускать после некоторых запросов.Я хочу сделать этот поток

В каждом запросе сохранять данные, полученные в памяти. После x запросов, отправить в другой API (или через x секунд)

В узле я могу сделать это:

let batchData = []
const handlerRequest = (req, res) => {
  batchData.push(req. body.data)
  if (batchData > 1000) {
    // Process to send to another api
    batchData = []
  }
  res.json({ success: true })
}

Или

let batchData = []
setInterval(() => {
  if (batchData > 1000) {
    // Process to send to another api
    batchData = []
  }
}, 10000)

const handlerRequest = (req, res) => {
  batchData.push(req. body.data)
  res.json({ success: true })
}

Как я могу сделать что-то подобное в Elixir Phoenix?

Спасибо за это

Ответы [ 2 ]

0 голосов
/ 30 сентября 2018

Вы можете использовать GenServer или Агент

GenServer

Общая идея состоит в том, чтобы иметь процесс GenServer, содержащий данные, которые должны быть обработаныа также обрабатывает фоновую обработку.Используя GenServer.cast/2, мы можем отправить сообщение процессу асинхронно.Поэтому всякий раз, когда контроллер получает запрос, мы добавляем новый элемент в очередь, а также проверяем, достигнут ли размер пакета, и обрабатываем его.

# In Controller (page_controller.ex) module
def index(conn, params) do
   App.BatchProcessor.add_item(params)
   conn|>json(%{success: true})
end

Добавить модуль для GenServer.Вы можете добавить новый файл lib/batch_processor.ex

defmodule App.BatchProcessor do
   use GenServer

   @batch_size 10 #whenever queue reaches this size we'll start processing

   def init(_) do
     initial_queue = []
     {:ok, initial_queue}
   end

   def start_link()do
      GenServer.start_link(__MODULE__, [], [name: __MODULE__])
   end

   #api function to add item to the 
   def add_item(data)do
      GenServer.cast({:add, data}, __MODULE__)
   end

   # implement GenServer behavior function to handle cast messages for adding item to the queue
   def handle_cast({:add, data}, queue) do
       update_queue = [data | queue] #addpend new item to front of queue

       #check if batch size is reached and process current batch
       if Enum.count(updated_queue) >= @batch_size do
          #send async message to current process to process batch
          GenServer.cast(__MODULE__, :process_batch)
       end
       {:noreply, updated_queue}
   end

   #implement GenServer behavior function to handle cast messages for processing batch
   def handle_cast(:process_queue, queue)do
     spawn(fn ->
        Enum.each(queue, fn data -> 
           IO.inspect(data)
        end)
     end)
     {:noreply, []} # reset queue to empty
   end
end

Запустить процесс BatchProcessor при запуске приложения Phoenix

#application.ex

children = [
  # Start the endpoint when the application starts
  supervisor(App.Web.Endpoint, []),
  # Start your own worker by calling: App.Web.Worker.start_link(arg1, arg2, arg3)
  worker(App.BatchProcessor, []),
]

Подробнее о GenServer Надеюсь, это поможет

0 голосов
/ 30 сентября 2018

Вот подход с использованием GenServer.Я предполагаю, что вы хотите запустить таймер при получении первого элемента.

defmodule RequestHandler do
  use GenServer

  @name __MODULE__
  @timeout 5_000
  @size 5

  def start_link(args \\ []) do
    GenServer.start_link(__MODULE__, args, name: @name)
  end

  def request(req) do
    GenServer.cast(@name, {:request, req})
  end

  def init(_) do
    {:ok, %{timer_ref: nil, requests: []}}
  end

  def handle_cast({:request, req}, state) do
    {:noreply, state |> update_in([:requests], & [req | &1]) |> handle_request()}
  end

  def handle_info(:timeout, state) do
    # sent to another API
    send_api(state.requests)
    {:noreply, reset_requests(state)}
  end

  defp handle_request(%{requests: requests} = state) when length(requests) == 1 do
    start_timer(state)
  end

  defp handle_request(%{requests: requests} = state) when length(requests) > @size do
    # sent to another API
    send_api(requests)
    reset_requests(state)
  end

  defp handle_request(state) do
    state
  end

  defp reset_requests(state) do
    state
    |> Map.put(:requests, [])
    |> cancel_timer()
  end

  defp start_timer(state) do
    timer_ref = Process.send_after(self(), :timeout, @timeout)
    state
    |> cancel_timer()
    |> Map.put(:timer_ref, timer_ref)
  end

  defp cancel_timer(%{timer_ref: nil} = state) do
    state
  end

  defp cancel_timer(%{timer_ref: timer_ref} = state) do
    Process.cancel_timer(timer_ref)
    Map.put(state, :timer_ref, nil)
  end

  defp send_api(requests) do
    IO.puts "sending #{length requests} requests"
  end
end

А вот несколько тестов

iex(5)> RequestHandler.start_link
{:ok, #PID<0.119.0>}
iex(6)> for i <- 1..6, do: Request
[Request, Request, Request, Request, Request, Request]
iex(7)> for i <- 1..6, do: RequestHandler.request(i)
sending 6 requests
[:ok, :ok, :ok, :ok, :ok, :ok]
iex(8)> for i <- 1..7, do: RequestHandler.request(i)
sending 6 requests
[:ok, :ok, :ok, :ok, :ok, :ok, :ok]
sending 1 requests
iex(9)> for i <- 1..3, do: RequestHandler.request(i)
[:ok, :ok, :ok]
sending 3 requests
iex(10)>
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...