Акка: есть ли Раковина, которая никогда не тянет? - PullRequest
1 голос
/ 21 апреля 2019

Нужен Sink, который никогда не тянет, чтобы использовать в юнит-тестах. Уже есть такой или мне нужно его самому кодировать?

Обратите внимание, что Sink.ignore() не поможет, потому что он ВСЕГДА тянет. Мне нужна раковина, которая НИКОГДА не тянет.

Ответы [ 2 ]

2 голосов
/ 22 апреля 2019

Прямой ответ

Вы можете создать org.reactivestreams.Subscriber, который никогда не вызывает Subscription.request:

import org.reactivestreams.Subscriber

def nonSubscriber[T] : Subscriber[T] = new Subscriber[T] {
  override def onComplete() : Unit = {}

  override def onError(throwable: java.lang.Throwable) : Unit = {}

  //should never be called therefore definition is not implemented
  override def onNext(t: T) : Unit = ???

  //does not call s.request
  override def onSubscribe(s: Subscription) : Unit = {}
} 

Этот подписчик может быть использован для создания Sink:

import akka.NotUsed
import akka.stream.scaladsl.Sink

def nonSubscribingSink[T] : Sink[T, NotUsed] = 
  Sink.fromSubscriber[T](nonSubscriber[T])

Косвенный ответ

Характер вопроса говорит о том, что вы смешиваете свою "бизнес-логику" с логикой потока akka. Возможно, вы захотите изменить дизайн , что может сделать ненужным ответ на ваш вопрос.

1 голос
/ 25 апреля 2019

Закончилось создание моей собственной реализации:

// sink that does not pull
val snkStage = object : GraphStage<SinkShape<Message>>() {
    val shape = SinkShape(Inlet.create<Message>("in"))
    override fun shape() = shape
    override fun createLogic(inheritedAttributes: Attributes): GraphStageLogic = object : GraphStageLogic(shape) {
        init {
            setHandler(shape.`in`()) {}
        }
    }
}

Но затем решил использовать более обычную комбинацию Sink.ignore() и Source.maybe() вместо.

...