Elixir: настройка GenStages для удовлетворения динамических требований - PullRequest
0 голосов
/ 12 декабря 2018

У меня есть производитель GenStage <- Потребитель должен что-то прочитать и сделать с сообщением из моей очереди Amazon SQS, что означает, что мой потребитель запрашивает спрос, когда ему нечего делать, а производитель просто получает и пытается получитьполучать события из Амазонки.И сейчас он отлично работает для моего спроса, так как максимальное количество событий, обрабатываемых каждым потребителем, равно 1. Но, учитывая масштабируемость, я бы хотел установить max_demand выше для каждого этапа. </p>

Мой первыйподход состоял в том, чтобы увеличить max_demand до 10. Но затем последовало предостережение, согласно документации:

При реализации потребителей мы часто устанавливаем: max_demand и: min_demand в подписке.Параметр: max_demand указывает максимальное количество событий, которые должны быть в потоке, в то время как параметр: min_demand указывает минимальное пороговое значение, которое необходимо инициировать для увеличения спроса.Например, если: max_demand равно 1000 и: min_demand равно 750, потребитель сначала запросит 1000 событий и запросит больше только после того, как получит не менее 250.

Это означает, что если у меня есть только 1событие в очереди, и никакие другие не появляются или занимают много времени, мне придется подождать до 10, чтобы обработать это 1 событие.Это очень плохо для нашей бизнес-потребности и немного странно, на мой взгляд.

Поэтому мой вопрос:

Как я могу обойти это и установить максимум спроса выше, чем1, но заставляет его обрабатывать любой номер входящего запроса?

(бонусный вопрос): Почему GenStage был разработан таким образом?В чем выгода?

Вот абстракция, которую я сделал для потребления из событий производителя ... Я думаю, что код производителя более прост, поскольку он просто предоставляет данные при запросе, но если кто-то сочтет это необходимым, я могу добавитьздесь:

defmodule MobileApi.GenstageWorkers.AwsSqsConsumer do
  use GenStage
  require Logger
  alias ExAws.SQS

  @ex_aws_sqs Application.get_env(
                :mobile_api,
                :ex_aws_sqs,
                ExAws
              )
  def start_link(init_args, opts \\ []) do
    GenStage.start_link(__MODULE__, init_args, opts)
  end

  def init(%{
        producer_id: producer,
        queue_name: queue_name,
        processor: processor_function,
        min_demand: min_demand,
        max_demand: max_demand
      }) do
    state = %{
      producer: producer,
      subscription: nil,
      queue: queue_name,
      processor: processor_function
    }

    GenStage.async_subscribe(
      self(),
      to: state.producer,
      min_demand: min_demand,
      max_demand: max_demand
    )

    {:consumer, state}
  end

  def handle_subscribe(:producer, _opts, from, state),
    do: {:automatic, Map.put(state, :subscription, from)}

  def handle_info(:init_ask, %{subscription: subscription} = state) do
    GenStage.ask(subscription, state.max_demand)

    {:noreply, [], state}
  end

  def handle_info(_, state), do: {:noreply, [], state}

  def handle_events(messages, _from, state) when is_nil(messages), do: {:noreply, [], state}

  def handle_events(messages, _from, state) do
    handle_messages(messages, state)
    {:noreply, [], state}
  end

  defp handle_messages(messages, state) do
    messages
    |> Enum.reduce([], &parse_message/2)
    |> process_message_batch(state.queue, state.processor)
  end

  defp parse_message(%{body: body, message_id: message_id, receipt_handle: receipt_handle}, acc) do
    case Poison.decode(body) do
      {:ok, decoded_body} ->
        [{decoded_body, %{id: message_id, receipt_handle: receipt_handle}} | acc]

      {:error, error_message} ->
        Logger.error(
          "An error has ocurred reading from queue, error message: #{inspect(error_message)} message body: #{
            inspect(body)
          }"
        )

        acc
    end
  end

  defp process_message_batch([], _, _), do: nil

  defp process_message_batch(messages_batch, queue_name, processor) do
    {bodies, metadatas} = Enum.unzip(messages_batch)
    Enum.map(bodies, fn body -> Task.start(fn -> processor.(body) end) end)

    SQS.delete_message_batch(queue_name, metadatas)
    |> @ex_aws_sqs.request
  end
end
...