Подключение ZipWith Outlet к другому ZipWith как Inlet - PullRequest
0 голосов
/ 28 декабря 2018

Я не могу определить ZipWith out как In другого ZipWith.Он получает только одно событие от одного из kafka topics, тогда остальная часть потока не работает и не получает никаких других событий.Single zipWith работает для 2 kafka datasources.Всякий раз, когда я представляю pmf и подключаю pm1 и pm2's выходы к входу pmf, это не работает :( Не могли бы вы помочь?

Данные получены из Kafka topics,он работает с datasources, как Source (от 1 до 100). Kafka datasource ведет себя по-разному, все мои тесты выполняются с фиктивной datasources.

package sample

import akka.{Done}  
import akka.actor.ActorSystem  
import akka.kafka.{ConsumerSettings, Subscriptions}  
import akka.kafka.scaladsl.Consumer  
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape}  
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source, ZipWith}  
import org.apache.kafka.clients.consumer.ConsumerConfig  
import org.apache.kafka.common.serialization.StringDeserializer  

import scala.concurrent.Future

object KafkaApp extends App {
  implicit val system = ActorSystem("StreamBuilder2323")
  implicit val materializer = ActorMaterializer()

  private val s1: Source[String, Consumer.Control] = initializeKafkaSource("topic1",system)
  private val s2: Source[String, Consumer.Control] = initializeKafkaSource("topic2",system)


  private val s3: Source[String, Consumer.Control] = initializeKafkaSource("topic3",system)

  private val s4:Source[String, Consumer.Control] = initializeKafkaSource("topic4",system)


  val concat = GraphDSL.create() { implicit b ⇒
    val zip = b.add(ZipWith[String,String,String](concatFunc _))
    UniformFanInShape(zip.out, zip.in0, zip.in1)
  }

  def concatFunc(s1:String, s2:String): String ={
    s1 + " _ " + s2
  }

  def printss(s:String): Unit ={
    print(s)
  }


  val sinkFinal = Sink.foreach(printss)

  val g = RunnableGraph.fromGraph(GraphDSL.create(sinkFinal) { implicit b ⇒     sink ⇒
    import GraphDSL.Implicits._
    val pm1 = b.add(concat)
    val pm2 = b.add(concat)
    val pmf = b.add(concat)
    s1  ~> pm1.in(0)
    s2  ~> pm1.in(1)

    s3  ~> pm2.in(0)
    s4  ~> pm2.in(1)

    pm1.out ~> pmf.in(0)
    pm2.out ~> pmf.in(1)

    pmf.out ~> sink.in

    ClosedShape
  })

  val max: Future[Done] = g.run()

  def initializeKafkaSource(topicName : String, system : ActorSystem): Source[String,Consumer.Control] ={
    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("consumerGroup")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    val subscription = Subscriptions.topics(topicName)
    val streamSource = Consumer.plainSource(consumerSettings, subscription).map(s => s.value())
    streamSource
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...