У меня есть производитель GenStage <- Потребитель должен что-то прочитать и сделать с сообщением из моей очереди Amazon SQS, что означает, что мой потребитель запрашивает спрос, когда ему нечего делать, а производитель просто получает и пытается получитьполучать события из Амазонки.И сейчас он отлично работает для моего спроса, так как максимальное количество событий, обрабатываемых каждым потребителем, равно 1. Но, учитывая масштабируемость, я бы хотел установить max_demand выше для каждого этапа. </p>
Мой первыйподход состоял в том, чтобы увеличить max_demand до 10. Но затем последовало предостережение, согласно документации:
При реализации потребителей мы часто устанавливаем: max_demand и: min_demand в подписке.Параметр: max_demand указывает максимальное количество событий, которые должны быть в потоке, в то время как параметр: min_demand указывает минимальное пороговое значение, которое необходимо инициировать для увеличения спроса.Например, если: max_demand равно 1000 и: min_demand равно 750, потребитель сначала запросит 1000 событий и запросит больше только после того, как получит не менее 250.
Это означает, что если у меня есть только 1событие в очереди, и никакие другие не появляются или занимают много времени, мне придется подождать до 10, чтобы обработать это 1 событие.Это очень плохо для нашей бизнес-потребности и немного странно, на мой взгляд.
Поэтому мой вопрос:
Как я могу обойти это и установить максимум спроса выше, чем1, но заставляет его обрабатывать любой номер входящего запроса?
(бонусный вопрос): Почему GenStage был разработан таким образом?В чем выгода?
Вот абстракция, которую я сделал для потребления из событий производителя ... Я думаю, что код производителя более прост, поскольку он просто предоставляет данные при запросе, но если кто-то сочтет это необходимым, я могу добавитьздесь:
defmodule MobileApi.GenstageWorkers.AwsSqsConsumer do
use GenStage
require Logger
alias ExAws.SQS
@ex_aws_sqs Application.get_env(
:mobile_api,
:ex_aws_sqs,
ExAws
)
def start_link(init_args, opts \\ []) do
GenStage.start_link(__MODULE__, init_args, opts)
end
def init(%{
producer_id: producer,
queue_name: queue_name,
processor: processor_function,
min_demand: min_demand,
max_demand: max_demand
}) do
state = %{
producer: producer,
subscription: nil,
queue: queue_name,
processor: processor_function
}
GenStage.async_subscribe(
self(),
to: state.producer,
min_demand: min_demand,
max_demand: max_demand
)
{:consumer, state}
end
def handle_subscribe(:producer, _opts, from, state),
do: {:automatic, Map.put(state, :subscription, from)}
def handle_info(:init_ask, %{subscription: subscription} = state) do
GenStage.ask(subscription, state.max_demand)
{:noreply, [], state}
end
def handle_info(_, state), do: {:noreply, [], state}
def handle_events(messages, _from, state) when is_nil(messages), do: {:noreply, [], state}
def handle_events(messages, _from, state) do
handle_messages(messages, state)
{:noreply, [], state}
end
defp handle_messages(messages, state) do
messages
|> Enum.reduce([], &parse_message/2)
|> process_message_batch(state.queue, state.processor)
end
defp parse_message(%{body: body, message_id: message_id, receipt_handle: receipt_handle}, acc) do
case Poison.decode(body) do
{:ok, decoded_body} ->
[{decoded_body, %{id: message_id, receipt_handle: receipt_handle}} | acc]
{:error, error_message} ->
Logger.error(
"An error has ocurred reading from queue, error message: #{inspect(error_message)} message body: #{
inspect(body)
}"
)
acc
end
end
defp process_message_batch([], _, _), do: nil
defp process_message_batch(messages_batch, queue_name, processor) do
{bodies, metadatas} = Enum.unzip(messages_batch)
Enum.map(bodies, fn body -> Task.start(fn -> processor.(body) end) end)
SQS.delete_message_batch(queue_name, metadatas)
|> @ex_aws_sqs.request
end
end