Преобразования в искровом потоке занимают больше времени, даже если есть 0 сообщений - PullRequest
0 голосов
/ 13 февраля 2019

У меня серьезная проблема с производительностью искрового потока.Для 10-секундного интервала выполнения программы программа занимает около 2 минут.Я пытался отлаживать без 0 сообщений из темы кафки.Большинство преобразований занимают более 30 секунд, даже если нет сообщений для обработки / обработки.Приведенная ниже команда занимает около 40 секунд, даже несмотря на то, что в decodeMessagesDF нет сообщений.

val enrichedDF: DataFrame = decodeMessagesDF.join(broadcast(customer), (decodeMessagesDF( "rowkey") === customer("rowkey")) && (customer("mkt_opto_flag") === "N") && (customer("ads_opto_flag") === "N"))

Кроме того, приведенный ниже код для публикации также занимает около 30 секунд для 0 сообщений для публикации

  message.foreachPartition{ part =>
  val producer = new KafkaProducer[String, String](props)
  part.foreach{ msg =>
    val message = new ProducerRecord[String, String](topic, msg._1, msg._2)
    producer.send(message)
  }
  producer.close()

}

Пожалуйста, дайте мне знать, если что-то не так в коде.Спасибо

1 Ответ

0 голосов
/ 13 февраля 2019

Широковещательная рассылка стоит дорого, если «клиент» имеет большое количество данных. И, возможно, вам следует отключить широковещательную рассылку (клиент) от операции соединения следующим образом:

    val consumerBroadcast = sc.broadcast(customer)
    val enrichedDF: DataFrame = decodeMessagesDF.join(broadcast(consumerBroadcast), (decodeMessagesDF( "rowkey") === customer("rowkey")) && (customer("mkt_opto_flag") === "N") && (customer("ads_opto_flag") === "N"))

Этот код будет транслировать клиента только один раз. НоВаш код будет транслироваться потребителю каждую партию.

...