Почему потоковое объединение запросов по темам kafka может занимать так много времени? - PullRequest
0 голосов
/ 27 ноября 2018

Я использую Spark Structured Streaming и объединяю два потока из тем Kafka.

DAG for the job

Я заметил, что потоковый запрос занимает около 15 секунд длякаждая запись.На приведенном ниже снимке экрана идентификатор этапа 2 занимает 15 секунд.Почему это может быть?

Time taken by each stage

Код выглядит следующим образом:

  val kafkaTopic1 = "demo2"
  val kafkaTopic2 = "demo3"
  val bootstrapServer = "localhost:9092"

  val spark = SparkSession
    .builder
    .master("local")
    .getOrCreate

  import spark.implicits._

  val df1 = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServer)
    .option("subscribe", kafkaTopic1)
    .option("failOnDataLoss", false)
    .load

  val df2 = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServer)
    .option("subscribe", kafkaTopic2)
    .option("failOnDataLoss", false)
    .load

  val order_details = df1
    .withColumn(...)
    .select(...)

  val invoice_details = df2
    .withColumn(...)
    .where(...)

  order_details
    .join(invoice_details)
    .where(order_details.col("s_order_id") === invoice_details.col("order_id"))
    .select(...)
    .writeStream
    .format("console")
    .option("truncate", false)
    .start
    .awaitTermination()

С кодом все работает отлично.Единственная проблема - время, чтобы объединить два потока.Как можно оптимизировать этот запрос?

1 Ответ

0 голосов
/ 03 декабря 2018

Вполне возможно, что время выполнения не является удовлетворительным, учитывая главный URL, то есть .master("local").Измените его на local[*] как минимум, и вы должны найти соединение быстрее.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...