Динамический граф вычислений во время выполнения с использованием Elixir Genstage - PullRequest
0 голосов
/ 24 октября 2018

Я бы хотел иметь возможность динамически изменять конвейер вычислений во время выполнения, но, похоже, GenStage требует, чтобы вычислитель вычислялся во время компиляции с помощью механизма subscribe_to: [...].Есть ли способ создания динамических вычислительных графов?Например, ниже я хотел бы во время выполнения переключаться между вершинами «вычитать 7» и «вычитать 4» в моем графике конвейера.

enter image description here

Возможно ли это с помощью GenStage?Скорее всего, у меня будут очень сложные конвейеры, поэтому мне нужно решение, которое масштабируется для изменения графиков сложными способами, в отличие от специальных решений, таких как, например, параметризация целого числа для вычитания.Я хотел бы иметь возможность добавлять или удалять целые поддеревья, переключаться между поддеревьями и добавлять узлы в график, в том числе вставляя их в середину любого поддерева, включая главное дерево.

Пожалуйста, обратитесь к РЕДАКТИРОВАТЬ вниз

Вот начальный производитель:

defmodule GenstageTest.Producer do
  use GenStage

  def start_link(initial \\ 1) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter), do: {:producer, counter}

  def handle_demand(demand, state) do
    events = Enum.to_list(state..(state + demand - 1))
    {:noreply, events, state + demand}
  end
end

Вот один из производителей_консумеров:

defmodule GenstageTest.PcTimesFive do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end

  def init(state) do
    {:producer_consumer, state, subscribe_to: [GenstageTest.PcAddOne]}
  end

  def handle_events(events, _from, state) do
    numbers =
      events
      |> Enum.map(&(&1 * 5))
    {:noreply, numbers, state}
  end
end

ивот конечный потребитель:

defmodule GenstageTest.Consumer do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
  end

  def init(state) do
    {:consumer, state, subscribe_to: [GenstageTest.PcDivTwo]}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      IO.inspect({self(), event, state})
    end

    # As a consumer we never emit events
    {:noreply, [], state}
  end
end

I Все это смоделировано из учебника Elixir School Genstage .

Все модули и mix.exs могут быть найдены на github .

РЕДАКТИРОВАТЬ 3 дня спустя после частичного ответа от @AquarHEAD L.

Мне удалось заставить работать подписки во время выполнения.Вот некоторые измененные производители, производители_потребители и потребители соответственно:

Производитель:

defmodule GenstageTest.Producer do
  use GenStage

  def start_link(initial \\ 1) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter), do: {:producer, counter}

  def handle_demand(demand, state) do
    events = Enum.to_list(state..(state + demand - 1))
    {:noreply, events, state + demand}
  end

  def handle_info({:doprint}, state) do
    IO.puts "yep"
    {:noreply, [], state}
  end

  def handle_info({:cancel, sublink}, state) do
    GenStage.cancel sublink, []
    {:noreply, [], state}
  end

end

Производитель_потребителя:

defmodule GenstageTest.PcAddOne do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end

  def init(state) do
    {:producer_consumer, state}
  end

  def handle_events(events, _from, state) do
    numbers =
      events
      |> Enum.map(&(&1 + 1))
    {:noreply, numbers, state}
  end
end

Потребитель:

defmodule GenstageTest.Consumer do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
  end

  def init(state) do
    {:consumer, state}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      IO.inspect event
      #File.write("/home/tbrowne/scratch/output.txt", 
      #  Kernel.inspect(event) <> " ", [:append])
      :timer.sleep(100)
    end

    # As a consumer we never emit events
    {:noreply, [], state}
  end
end

Теперь, когда все они доступны в каталоге lib (не забудьте добавить {:gen_stage, "~> 0.11"} в ваши файлы mix.exs), или скопировать и вставить в IEX, то следующее будет отлично работать:

{:ok, p} = GenstageTest.Producer.start_link(0)
{:ok, a1} = GenstageTest.PcAddOne.start_link()
{:ok, c} = GenstageTest.Consumer.start_link()
{:ok, link1} = GenStage.sync_subscribe(a1, to: p, min_demand: 0, max_demand: 1, cancel: :transient)
{:ok, link2} = GenStage.sync_subscribe(c, to: a1, min_demand: 0, max_demand: 1, cancel: :transient)

проблема в том, что я все еще не знаю, как отменить подписку.Существует функция отмены , а также функция останова .Например, GenStage.stop(c), кажется, ничего не делает, в то время как мои различные попытки GenStage.cancel/3 только дают ошибки.

Напомним, что сейчас мне нужно уметь останавливать определенные этапы и заменять их другими.Каков синтаксис отмены подписки и откуда она вызывается?Это не очень хорошо объяснено в документах, так как нет конкретного примера.

Ответы [ 2 ]

0 голосов
/ 02 ноября 2018

Почему бы не реализовать свой собственный GenStage.Dispatcher?Вот поведение

0 голосов
/ 26 октября 2018

Вы можете абсолютно изменить конвейер во время выполнения, проверить первый пример в документации GenStage , , вы также можете использовать режим :manual для точного управления запросом .Также есть API для отмены подписки .Я думаю, что этого достаточно для динамического управления конвейерами GenStage.

...