Apache Spark SQL: как оптимизировать цепочечное объединение для данных - PullRequest
1 голос
/ 12 апреля 2019

Мне нужно сделать левое соединение между основным фреймом данных и несколькими опорными фреймами, так что вычисление цепного соединения.И мне интересно, как сделать это действие эффективным и масштабируемым.

Метод 1 прост для понимания, который также является текущим методом, но я не удовлетворен, потому что все преобразования были объединены в цепочку и ожидали финальногодействие для запуска вычисления, если я продолжу добавлять преобразование и объем данных, в конце произойдет сбой spark, поэтому этот метод не масштабируется.

Метод 1:

  def pipeline(refDF1: DataFrame, refDF2: DataFrame, refDF3: DataFrame, refDF4: DataFrame, refDF5: DataFrame): DataFrame = {

  val transformations: List[DataFrame => DataFrame] = List(
    castColumnsFromStringToLong(ColumnsToCastToLong),
    castColumnsFromStringToFloat(ColumnsToCastToFloat),
    renameColumns(RenameMapping),
    filterAndDropColumns,
    joinRefDF1(refDF1),
    joinRefDF2(refDF2),
    joinRefDF3(refDF3),
    joinRefDF4(refDF4),
    joinRefDF5(refDF5),
    calculate()
  )

  transformations.reduce(_ andThen _)

  }

  pipeline(refDF1, refDF2, refDF3, refDF4, refDF5)(principleDF)

Метод 2: Я не нашел реального способа реализовать свою идею, но я надеюсь немедленно запустить вычисление каждого соединения.

в соответствии с моим тестом count () слишком тяжел для искры и бесполезен длямое приложение, но я не знаю, как запустить вычисление соединения с эффективным действием .Такого рода действия фактически являются ответом на этот вопрос.

  val joinedDF_1 = castColumnsFromStringToLong(principleDF, ColumnsToCastToLong)
  joinedDF_1.cache() // joinedDF is not always used multiple times, but for some data frame, it is, so I add cache() to indicate the usage
  joinedDF_1.count()  

  val joinedDF_2 = castColumnsFromStringToFloat(joinedDF_1, ColumnsToCastToFloat)
  joinedDF_2.cache()
  joinedDF_2.count()

  val joinedDF_3 = renameColumns(joinedDF_2, RenameMapping)
  joinedDF_3.cache()
  joinedDF_3.count()

  val joinedDF_4 = filterAndDropColumns(joinedDF_4)
  joinedDF_4.cache()
  joinedDF_4.count()

  ...

Ответы [ 2 ]

2 голосов
/ 12 апреля 2019

Если вы хотите форсировать вычисление заданного join (или любого другого преобразования, которое не является окончательным) в Spark, вы можете использовать простой show или count на вашем DataFrame. Этот вид конечных точек приведет к вычислению результата, потому что иначе просто невозможно выполнить действие.

Только после этого ваш DataFrame будет эффективно храниться в вашем кэше.

Как только вы закончите с данным DataFrame, не стесняйтесь, чтобы остаться без сопротивления. Это приведет к потере устойчивости ваших данных, если вашему кластеру потребуется больше места для дальнейших вычислений.

0 голосов
/ 12 апреля 2019

Вам необходимо перераспределить ваш набор данных по столбцам перед вызовом преобразования объединения.

Пример:

df1=df1.repartion(col("col1"),col("col2"))
df2=df2.repartion(col("col1"),col("col2"))
joinDF = df1.join(jf2,df1.col("col1").equals(df2.col("col1")) &....)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...