Прямой ответ
Вы можете создать org.reactivestreams.Subscriber
, который никогда не вызывает Subscription.request
:
import org.reactivestreams.Subscriber
def nonSubscriber[T] : Subscriber[T] = new Subscriber[T] {
override def onComplete() : Unit = {}
override def onError(throwable: java.lang.Throwable) : Unit = {}
//should never be called therefore definition is not implemented
override def onNext(t: T) : Unit = ???
//does not call s.request
override def onSubscribe(s: Subscription) : Unit = {}
}
Этот подписчик может быть использован для создания Sink
:
import akka.NotUsed
import akka.stream.scaladsl.Sink
def nonSubscribingSink[T] : Sink[T, NotUsed] =
Sink.fromSubscriber[T](nonSubscriber[T])
Косвенный ответ
Характер вопроса говорит о том, что вы смешиваете свою "бизнес-логику" с логикой потока akka. Возможно, вы захотите изменить дизайн , что может сделать ненужным ответ на ваш вопрос.