Разбиение на SubType для потоков akka - PullRequest
0 голосов
/ 12 января 2019

У меня есть поток akka, где у меня есть ADT формы.

sealed trait Message
sealed trait ThisMessage extends Message
sealed trait ThatMessage extends Message

Теперь у меня есть поток обработчиков сообщений и поток обработчиков сообщений. У меня есть входной поток, который принимает тип сообщения.

Чтобы создать разделение, у меня есть следующий разделитель. У меня есть следующее определение для функции разделителя.

 /**
  * Creates a Partition stage that, given a type A, makes a decision to whether to partition to subtype B or subtype C
  *
  * @tparam A type of input
  * @tparam B type of output on the first outlet.
  * @tparam C type of output on the second outlet.
  *
  * @return A partition stage
  */
  def binaryPartitionByType[A, B <: A, C <: A](): Graph[FanOutShape2[A, B, C], NotUsed] =
GraphDSL.create[FanOutShape2[A, B, C]]() { implicit builder =>
  import GraphDSL.Implicits._

  // This is wrong, but I have no idea how to write this.
  val partitioner: UniformFanOutShape[A, A] = builder.add(Partition[A](2, {
    case _: B => 0
    case _: C => 1
  }))

  new FanOutShape2(partitioner.in, partitioner.out(0).outlet, partitioner.out(1).outlet)
}

Я хочу использовать описанный выше метод и использовать ADT в параметрах типа для инициализации разделителя.

Компилятор выдает эту ошибку.

Error:(63, 7) type mismatch;
 found   : akka.stream.FanOutShape2[A,A,A]
 required: akka.stream.FanOutShape2[A,B,C]
      new FanOutShape2(partitioner.in, partitioner.out(0).outlet, 
partitioner.out(1).outlet)

Из того, что я понимаю, объект раздела имеет только вход (в данном случае A, параметризованный тип.

У кого-нибудь есть идеи, как я могу это исправить?

Ответы [ 2 ]

0 голосов
/ 12 января 2019

Вот один из способов создания FanOutShape2[A, B<:A, C<:A] из UniformFanOutShape[A, A], сгенерированного builder.add(Partition[A]()):

import akka.stream.scaladsl._
import akka.stream.{Graph, FanOutShape2}
import akka.NotUsed
import scala.reflect.ClassTag

def binaryPartitionByType[A, B <: A : ClassTag, C <: A : ClassTag](): Graph[FanOutShape2[A, B, C], NotUsed] =
  GraphDSL.create[FanOutShape2[A, B, C]]() { implicit builder =>
    import GraphDSL.Implicits._

    val partitioner = builder.add(Partition[A](2, {
      case _: B => 0
      case _: C => 1
    }))

    val partitionB = builder.add(Flow[A].collect{ case b: B => b })
    val partitionC = builder.add(Flow[A].collect{ case c: C => c })

    partitioner.out(0) ~> partitionB
    partitioner.out(1) ~> partitionC

    new FanOutShape2(partitioner.in, partitionB.out, partitionC.out)
}

// binaryPartitionByType: [A, B <: A, C <: A]()(
//   implicit evidence$1: scala.reflect.ClassTag[B], implicit evidence$2: scala.reflect.ClassTag[C]
// ) akka.stream.Graph[akka.stream.FanOutShape2[A,B,C],akka.NotUsed]

Обратите внимание, что ClassTag необходим, чтобы избежать стирания типа.

0 голосов
/ 12 января 2019

Дело в том, что вы пытаетесь подорвать систему типов. UniformFanOutShape названо "равномерным", потому что все его выходы имеют одинаковый тип. Если бы это было не так, вам не нужно было бы сначала создавать дополнительные FanOutShape2. Если вы собираетесь использовать систему типов, вы должны делать это последовательно, поэтому вы должны изменить тип Outlet s. Попробуйте что-то вроде этого:

new FanOutShape2(partitioner.in, partitioner.out(0).outlet.as[B], partitioner.out(1).outlet.as[C])
...