Я не могу определить ZipWith out
как In
другого ZipWith
.Он получает только одно событие от одного из kafka topics
, тогда остальная часть потока не работает и не получает никаких других событий.Single zipWith
работает для 2 kafka
datasources
.Всякий раз, когда я представляю pmf
и подключаю pm1
и pm2's
выходы к входу pmf
, это не работает :( Не могли бы вы помочь?
Данные получены из Kafka topics
,он работает с datasources
, как Source (от 1 до 100). Kafka datasource
ведет себя по-разному, все мои тесты выполняются с фиктивной datasources
.
package sample
import akka.{Done}
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape}
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source, ZipWith}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import scala.concurrent.Future
object KafkaApp extends App {
implicit val system = ActorSystem("StreamBuilder2323")
implicit val materializer = ActorMaterializer()
private val s1: Source[String, Consumer.Control] = initializeKafkaSource("topic1",system)
private val s2: Source[String, Consumer.Control] = initializeKafkaSource("topic2",system)
private val s3: Source[String, Consumer.Control] = initializeKafkaSource("topic3",system)
private val s4:Source[String, Consumer.Control] = initializeKafkaSource("topic4",system)
val concat = GraphDSL.create() { implicit b ⇒
val zip = b.add(ZipWith[String,String,String](concatFunc _))
UniformFanInShape(zip.out, zip.in0, zip.in1)
}
def concatFunc(s1:String, s2:String): String ={
s1 + " _ " + s2
}
def printss(s:String): Unit ={
print(s)
}
val sinkFinal = Sink.foreach(printss)
val g = RunnableGraph.fromGraph(GraphDSL.create(sinkFinal) { implicit b ⇒ sink ⇒
import GraphDSL.Implicits._
val pm1 = b.add(concat)
val pm2 = b.add(concat)
val pmf = b.add(concat)
s1 ~> pm1.in(0)
s2 ~> pm1.in(1)
s3 ~> pm2.in(0)
s4 ~> pm2.in(1)
pm1.out ~> pmf.in(0)
pm2.out ~> pmf.in(1)
pmf.out ~> sink.in
ClosedShape
})
val max: Future[Done] = g.run()
def initializeKafkaSource(topicName : String, system : ActorSystem): Source[String,Consumer.Control] ={
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("consumerGroup")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val subscription = Subscriptions.topics(topicName)
val streamSource = Consumer.plainSource(consumerSettings, subscription).map(s => s.value())
streamSource
}
}