Мой вариант использования: я хочу создать поток Akka, в котором источником является Java-очередь. Поток должен сохранять значения пула из очереди, и если очередь пуста, то дождитесь значений в очереди. Akka stream queue - это еще один вариант, но если в случае какого-либо сбоя я хочу сохранить значения, которые есть в очереди (я не знаю, как это сделать с Akka stream Queue). Я попробовал следующее:
val source: Source[String, NotUsed] = Source.from(queue)
source.ask(1, actor, classOf[String], 10 seconds).runWith(Sink.ignore(), mat)
Я попытался установить свойства idleTimeOut
и keepAlive
, но это не работает. Поток переходит в состояние «Готово», если в очереди нет значений.