GenStage.from_enumerable зависает с прерывистым потоком - PullRequest
0 голосов
/ 27 июня 2018

У меня есть поток, который не производит данные так быстро, как их потребляют.

Итак, у меня есть продюсер, определенный так:

def start_link() do
  create_stream
  |> GenStage.from_enumerable(name: Producer)
end

Тогда мой производитель-потребитель подписывается на него

  def init(:ok) do
    {:producer_consumer, :the_state_does_not_matter, subscribe_to: [Producer]}
  end

И мой потребитель подписывается на моего производителя-потребителя

  def init(:ok) do
    {:consumer, :the_state_does_not_matter, subscribe_to: [ProducerConsumer]}
  end

Проблема, с которой я сталкиваюсь, заключается в зависании потребителя, я думаю, что в какой-то момент производителю не удалось получить новые данные и, как указано в документации:

Когда перечислимое заканчивается или останавливается, сцена выйдет с нормальная причина Это означает, что если потребитель подписывается на перечислимая стадия и опция: cancel установлена ​​на: постоянный, который по умолчанию потребитель также завершит работу с: обычная причина

Поэтому я читаю больше, и он предлагает добавить опцию cancel:: transient, чтобы не закончить этап. Я добавил это так, но это не работает, я что-то упустил?

|> GenStage.from_enumerable(name: Producer, cancel: :transient)

Первоначально я использовал Flow.into_stages(flow, [ProducerConsumer]), но я не могу этого сделать, потому что я не могу ссылаться (или я не знаю, как) на ProducerConsumer из моего дерева супервизоров

children = [
  {Producer, []},
  {ProducerConsumer, []},
  {Consumer, []}
]

Обновление

Обновление передаваемой ссылки на Flow.into_stages из дочернего определения

children = [
  {Producer, [name: ProducerConsumer]},
  {ProducerConsumer, []},
  {Consumer, []}
]

def start_link(producer_consumer) do
  create_stream
  |> Flow.into_stages(producer_consumer)
end

** (Mix) Не удалось запустить тест приложения: Application.start (: normal, []) вернул ошибку: shutdown: не удалось запустить ребенка: продюсер ** (EXIT) завершено в: GenServer.call ({: name, ProducerConsumer}, {: "$ subscribe", nil, #PID <0.2031.0>, [cancel:: transient]}, 5000) ** (EXIT) нет связи с Elixir.ProducerConsumer

1 Ответ

0 голосов
/ 29 июня 2018

Ошибка:

** (Mix) Не удалось запустить тест приложения: Application.start (: normal, []) вернул ошибку: shutdown: не удалось запустить child: Producer ** (EXIT) выход из: GenServer.call ({: name, ProducerConsumer}, {: "$ подписка", ноль, #PID <0.2031.0>, [отмена:: переходный]}, 5000) ** (EXIT) нет связи с Elixir.ProducerConsumer

Просто означает, что когда Flow.into_stages пытается синхронизировать с предоставленным потребителем, этот потребитель уже должен быть запущен.

Итак, при контроле важен порядок, как-то так:

children = [
  Consumer,
  FlowProducerWorker # worker which implements Flow.into_stages(flow, [Consumer])
]
...