Влияет ли пропущенная стадия на производительность Spark? - PullRequest
0 голосов
/ 14 апреля 2020

Я выполняю задание на потоковую структурированную обработку, которое включает создание пустого информационного кадра и его обновление с использованием каждой микропакета, как показано ниже. С каждым выполнением микропакета число этапов увеличивается на 4. Чтобы избежать повторного вычисления, я сохраняю обновленный StaticDF в памяти после каждого обновления внутри l oop. Это помогает пропустить те дополнительные этапы, которые создаются с каждой новой микропакетой.

Мои вопросы -

1) Даже если общее количество завершенных этапов остается таким же, как увеличенные этапы всегда пропущено, но может ли это привести к проблемам с производительностью, поскольку в один момент времени на пропущенных этапах могут быть миллионы?
2) Что происходит, когда какая-то часть или весь кэшированный RDD недоступен? (ошибка узла / исполнителя). Документация Spark гласит, что до сих пор не материализованы все данные, полученные из нескольких микропакетов, поэтому это означает, что ему потребуется снова прочитать все события из Kafka для регенерации staticDF?

// one time creation of empty static(not streaming) dataframe
val staticDF_schema = new StructType()
      .add("product_id", LongType)
      .add("created_at", LongType)
var staticDF = sparkSession
.createDataFrame(sparkSession.sparkContext.emptyRDD[Row], staticDF_schema)

// Note : streamingDF was created from Kafka source
    streamingDF.writeStream
      .trigger(Trigger.ProcessingTime(10000L))
      .foreachBatch {
        (micro_batch_DF: DataFrame) => {

        // fetching max created_at for each product_id in current micro-batch
          val staging_df = micro_batch_DF.groupBy("product_id")
            .agg(max("created").alias("created"))

          // Updating staticDF using current micro batch
          staticDF = staticDF.unionByName(staging_df)
          staticDF = staticDF
            .withColumn("rnk",
              row_number().over(Window.partitionBy("product_id").orderBy(desc("created_at")))
            ).filter("rnk = 1")
            .drop("rnk")
              .cache()

          }

enter image description here

1 Ответ

0 голосов
/ 23 апреля 2020

Несмотря на то, что пропущенные этапы не требуют каких-либо вычислений, но моя работа начинала давать сбой после определенного количества партий. Это происходило из-за роста DAG при каждом пакетном выполнении, что делало его неуправляемым и вызывало исключение переполнения стека.

Чтобы избежать этого, мне пришлось разбить родословную искры, чтобы число этапов не увеличивалось с каждым запустить (даже если они пропущены)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...