Вы можете использовать GenServer или Агент
GenServer
Общая идея состоит в том, чтобы иметь процесс GenServer, содержащий данные, которые должны быть обработаныа также обрабатывает фоновую обработку.Используя GenServer.cast/2
, мы можем отправить сообщение процессу асинхронно.Поэтому всякий раз, когда контроллер получает запрос, мы добавляем новый элемент в очередь, а также проверяем, достигнут ли размер пакета, и обрабатываем его.
# In Controller (page_controller.ex) module
def index(conn, params) do
App.BatchProcessor.add_item(params)
conn|>json(%{success: true})
end
Добавить модуль для GenServer.Вы можете добавить новый файл lib/batch_processor.ex
defmodule App.BatchProcessor do
use GenServer
@batch_size 10 #whenever queue reaches this size we'll start processing
def init(_) do
initial_queue = []
{:ok, initial_queue}
end
def start_link()do
GenServer.start_link(__MODULE__, [], [name: __MODULE__])
end
#api function to add item to the
def add_item(data)do
GenServer.cast({:add, data}, __MODULE__)
end
# implement GenServer behavior function to handle cast messages for adding item to the queue
def handle_cast({:add, data}, queue) do
update_queue = [data | queue] #addpend new item to front of queue
#check if batch size is reached and process current batch
if Enum.count(updated_queue) >= @batch_size do
#send async message to current process to process batch
GenServer.cast(__MODULE__, :process_batch)
end
{:noreply, updated_queue}
end
#implement GenServer behavior function to handle cast messages for processing batch
def handle_cast(:process_queue, queue)do
spawn(fn ->
Enum.each(queue, fn data ->
IO.inspect(data)
end)
end)
{:noreply, []} # reset queue to empty
end
end
Запустить процесс BatchProcessor при запуске приложения Phoenix
#application.ex
children = [
# Start the endpoint when the application starts
supervisor(App.Web.Endpoint, []),
# Start your own worker by calling: App.Web.Worker.start_link(arg1, arg2, arg3)
worker(App.BatchProcessor, []),
]
Подробнее о GenServer Надеюсь, это поможет