Я использую следующий код, чтобы изменить форму информационного кадра, используя его строки для этого изменения формы.
Фрейм данных содержит дату, когда продукт меняет свой идентификатор, но чтобы присоединить его к огромному другому фрейму данных, содержащему транзакции, мне нужен новый столбец, который определяет допустимый диапазон идентификаторов.
Например, если продукт A изменится на продукт B, вступивший в силу 01/01 дня, а затем изменится на продукт C, вступивший в силу 03/01, мне понадобится дата начала и окончания в одной строке, чтобы я мог присоединиться к нему с огромной фильтрацией данных по транзакциям по датам, когда продукт фактически является B (или C), поэтому я могу правильно переименовать продукты в их действительный действительный идентификатор.
Другая часть информации, df_MPC, содержит около 800 строк и не будет расти намного больше.
Итак, я пытаюсь использовать подход (который работает при запуске в среде разработки) - сложить влево.
Сводная версия фрейма данных MPC будет:
Product | Date | NewProd
A | 01/01/2018| B
B | 03/01/2018| C
Цель:
Product | Date | NewProd | OriginalProd | EndDate
A | 01/01/2018| B | A | 03/01
B | 03/01/2018| C | A | 31/12-9999
(столбец OriginalProd необходим для окончательного соединения с фреймом данных транзакций)
И код, приводящий к стекопотоку, выглядит следующим образом:
var rowList = new ListBuffer[Row]()
val it = df_MPC_SOURCE.toLocalIterator()
while (it.hasNext) { rowList += it.next()}
val df_MPC_TRANSFORMED = rowList.reverse
.foldLeft(df_MPC_pre_edit_source: DataFrame)((acc, elem) => acc
.withColumn("EndDate",
when((col("N_DISTRIBUTOR_CODE") === elem.getAs("N_DISTRIBUTOR_CODE"))
&& col("N_CONTRACT_CODE") === elem.getAs("N_CONTRACT_CODE")
&& (col("N_PRODUCT_ID_NEW") === elem.getAs("N_PRODUCT_ID")),
elem.getAs("D_EFFECTIVE_CHANGE"))
.otherwise(col("EndDate")))
.withColumn("OriginalProd",
when((col("N_DISTRIBUTOR_CODE") === elem.getAs("N_DISTRIBUTOR_CODE"))
&& col("N_CONTRACT_CODE") === elem.getAs("N_CONTRACT_CODE")
&& (col("MPC_original") === elem.getAs("N_PRODUCT_ID_NEW")),
elem.getAs("N_PRODUCT_ID"))
.otherwise(col("OriginalProd")))
)
Этот код преобразует исходный кадр данных (пример, приведенный выше) в объективный кадр данных (пример также выше).
Он делает это, перебирая все свои 800 строк в отсортированном виде (по дате) и для каждой из своих строк:
- Изменить действительную дату для всех продуктов, соответствующих данной строке
- Обновите исходный идентификатор продукта в случае, если мы найдем промежуточное
товар. Например, если у нас есть продукт, который переходит с идентификатора «А» на
«B» и от «B» до «C» позже, нам понадобится столбец с
исходный идентификатор продукта (в данном случае «A») для возможности присоединиться к нашему
результат с исходной таблицей транзакций, которая будет содержать только
код продукта "A".
И выдается ошибка, когда этот код используется в кластере:
Exception in thread "main" java.lang.StackOverflowError
at scala.collection.GenSetLike$class.apply(GenSetLike.scala:44)
at scala.collection.AbstractSet.apply(Set.scala:47)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$11.apply(TreeNode.scala:334)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:333)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
Как я могу заставить этот код работать в кластере так же, как он работает правильно локально?
Спасибо!