Я использую Spark Structured Streaming и объединяю два потока из тем Kafka.
Я заметил, что потоковый запрос занимает около 15 секунд длякаждая запись.На приведенном ниже снимке экрана идентификатор этапа 2 занимает 15 секунд.Почему это может быть?
Код выглядит следующим образом:
val kafkaTopic1 = "demo2"
val kafkaTopic2 = "demo3"
val bootstrapServer = "localhost:9092"
val spark = SparkSession
.builder
.master("local")
.getOrCreate
import spark.implicits._
val df1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServer)
.option("subscribe", kafkaTopic1)
.option("failOnDataLoss", false)
.load
val df2 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServer)
.option("subscribe", kafkaTopic2)
.option("failOnDataLoss", false)
.load
val order_details = df1
.withColumn(...)
.select(...)
val invoice_details = df2
.withColumn(...)
.where(...)
order_details
.join(invoice_details)
.where(order_details.col("s_order_id") === invoice_details.col("order_id"))
.select(...)
.writeStream
.format("console")
.option("truncate", false)
.start
.awaitTermination()
С кодом все работает отлично.Единственная проблема - время, чтобы объединить два потока.Как можно оптимизировать этот запрос?