Spark Scala FoldLeft, приводящий к StackOverflow при запуске в кластере - PullRequest
0 голосов
/ 04 сентября 2018

Я использую следующий код, чтобы изменить форму информационного кадра, используя его строки для этого изменения формы.

Фрейм данных содержит дату, когда продукт меняет свой идентификатор, но чтобы присоединить его к огромному другому фрейму данных, содержащему транзакции, мне нужен новый столбец, который определяет допустимый диапазон идентификаторов.

Например, если продукт A изменится на продукт B, вступивший в силу 01/01 дня, а затем изменится на продукт C, вступивший в силу 03/01, мне понадобится дата начала и окончания в одной строке, чтобы я мог присоединиться к нему с огромной фильтрацией данных по транзакциям по датам, когда продукт фактически является B (или C), поэтому я могу правильно переименовать продукты в их действительный действительный идентификатор.

Другая часть информации, df_MPC, содержит около 800 строк и не будет расти намного больше.

Итак, я пытаюсь использовать подход (который работает при запуске в среде разработки) - сложить влево.

Сводная версия фрейма данных MPC будет:

Product | Date      | NewProd
A       | 01/01/2018| B
B       | 03/01/2018| C

Цель:

Product | Date      | NewProd | OriginalProd | EndDate
A       | 01/01/2018| B       | A            | 03/01
B       | 03/01/2018| C       | A            | 31/12-9999

(столбец OriginalProd необходим для окончательного соединения с фреймом данных транзакций)

И код, приводящий к стекопотоку, выглядит следующим образом:

var rowList = new ListBuffer[Row]()
val it = df_MPC_SOURCE.toLocalIterator()
while (it.hasNext) { rowList += it.next()}

val df_MPC_TRANSFORMED = rowList.reverse
  .foldLeft(df_MPC_pre_edit_source: DataFrame)((acc, elem) => acc
    .withColumn("EndDate",
      when((col("N_DISTRIBUTOR_CODE") === elem.getAs("N_DISTRIBUTOR_CODE"))
        && col("N_CONTRACT_CODE") === elem.getAs("N_CONTRACT_CODE")
        && (col("N_PRODUCT_ID_NEW") === elem.getAs("N_PRODUCT_ID")),
        elem.getAs("D_EFFECTIVE_CHANGE"))
        .otherwise(col("EndDate")))
    .withColumn("OriginalProd",
      when((col("N_DISTRIBUTOR_CODE") === elem.getAs("N_DISTRIBUTOR_CODE"))
        && col("N_CONTRACT_CODE") === elem.getAs("N_CONTRACT_CODE")
        && (col("MPC_original") === elem.getAs("N_PRODUCT_ID_NEW")),
        elem.getAs("N_PRODUCT_ID"))
        .otherwise(col("OriginalProd")))
  )

Этот код преобразует исходный кадр данных (пример, приведенный выше) в объективный кадр данных (пример также выше).

Он делает это, перебирая все свои 800 строк в отсортированном виде (по дате) и для каждой из своих строк:

  • Изменить действительную дату для всех продуктов, соответствующих данной строке
  • Обновите исходный идентификатор продукта в случае, если мы найдем промежуточное товар. Например, если у нас есть продукт, который переходит с идентификатора «А» на «B» и от «B» до «C» позже, нам понадобится столбец с исходный идентификатор продукта (в данном случае «A») для возможности присоединиться к нашему результат с исходной таблицей транзакций, которая будет содержать только код продукта "A".

И выдается ошибка, когда этот код используется в кластере:

Exception in thread "main" java.lang.StackOverflowError
        at scala.collection.GenSetLike$class.apply(GenSetLike.scala:44)
        at scala.collection.AbstractSet.apply(Set.scala:47)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$11.apply(TreeNode.scala:334)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:333)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)

Как я могу заставить этот код работать в кластере так же, как он работает правильно локально? Спасибо!

Ответы [ 2 ]

0 голосов
/ 04 сентября 2018

Мне понадобилось время, чтобы понять, что ты пытался сделать. Я думаю, что вы можете сделать то же самое с более простым подходом.

Это не объясняет, почему ваш код не работает, но ваш foldleft можно заменить запросом spark sql, что-то вроде этого:

df_MPC_SOURCE.registerTempTable("mpc_source")

val test = sqlContext.sql(
  """select c1.N_PRODUCT_ID,c1.D_EFFECTIVE_CHANGE,c1.N_PRODUCT_ID_NEW,
    |coalesce(c2.D_EFFECTIVE_CHANGE,c1.MPC_endDate) as MPC_endDate,
    |coalesce(c3.N_PRODUCT_ID,c1.MPC_original) as MPC_original
    |from mpc_source c1
    |left join mpc_source c2 on c1.N_DISTRIBUTOR_CODE=c2.N_DISTRIBUTOR_CODE
    |and c1.N_CONTRACT_CODE=c2.N_CONTRACT_CODE
    |and c1.N_PRODUCT_ID_NEW=c2.N_PRODUCT_ID
    |left join mpc_source c3 on c1.N_DISTRIBUTOR_CODE=c3.N_DISTRIBUTOR_CODE
    |and c1.N_CONTRACT_CODE=c3.N_CONTRACT_CODE
    |and c1.MPC_original = c3.N_PRODUCT_ID_NEW
  """.stripMargin)

Надеюсь, это поможет вам.

0 голосов
/ 04 сентября 2018

Я бы проверил различия в конфигах Spark executor на локальной машине и кластере. Может случиться так, что число потоков (задач / ядер), созданных на локальных машинах, может быть меньше, чем количество задач, создаваемых в исполнителе в кластере. Уменьшение количества ядер на одного исполнителя уменьшит количество потоков, созданных в jvm исполнителя, и, следовательно, пространство, занимаемое стеком потоков. В качестве альтернативы, вы можете попробовать увеличить объем памяти для каждого исполнителя. было бы хорошо, чтобы конфигурация исполнителей была одинаковой на обеих машинах, а затем посмотреть, воспроизводится ли проблема.

...