У меня есть поток, который не производит данные так быстро, как их потребляют.
Итак, у меня есть продюсер, определенный так:
def start_link() do
create_stream
|> GenStage.from_enumerable(name: Producer)
end
Тогда мой производитель-потребитель подписывается на него
def init(:ok) do
{:producer_consumer, :the_state_does_not_matter, subscribe_to: [Producer]}
end
И мой потребитель подписывается на моего производителя-потребителя
def init(:ok) do
{:consumer, :the_state_does_not_matter, subscribe_to: [ProducerConsumer]}
end
Проблема, с которой я сталкиваюсь, заключается в зависании потребителя, я думаю, что в какой-то момент производителю не удалось получить новые данные и, как указано в документации:
Когда перечислимое заканчивается или останавливается, сцена выйдет с
нормальная причина Это означает, что если потребитель подписывается на
перечислимая стадия и опция: cancel установлена на: постоянный, который
по умолчанию потребитель также завершит работу с: обычная причина
Поэтому я читаю больше, и он предлагает добавить опцию cancel:: transient
, чтобы не закончить этап. Я добавил это так, но это не работает, я что-то упустил?
|> GenStage.from_enumerable(name: Producer, cancel: :transient)
Первоначально я использовал Flow.into_stages(flow, [ProducerConsumer])
, но я не могу этого сделать, потому что я не могу ссылаться (или я не знаю, как) на ProducerConsumer из моего дерева супервизоров
children = [
{Producer, []},
{ProducerConsumer, []},
{Consumer, []}
]
Обновление
Обновление передаваемой ссылки на Flow.into_stages из дочернего определения
children = [
{Producer, [name: ProducerConsumer]},
{ProducerConsumer, []},
{Consumer, []}
]
def start_link(producer_consumer) do
create_stream
|> Flow.into_stages(producer_consumer)
end
** (Mix) Не удалось запустить тест приложения: Application.start (: normal, []) вернул ошибку: shutdown:
не удалось запустить ребенка: продюсер
** (EXIT) завершено в: GenServer.call ({: name, ProducerConsumer}, {: "$ subscribe", nil, #PID <0.2031.0>, [cancel:: transient]}, 5000)
** (EXIT) нет связи с Elixir.ProducerConsumer