Как @ Луис Мигель Мехиа Суарес очень хорошо объяснил в комментарии, любая функция, переданная в качестве аргумента DataFrame.foreach
, будет выполняться на одном или нескольких компьютерах-исполнителях, а не на драйвере, выполняющем этот код,поэтому любое изменение изменчивого состояния будет потеряно (оно будет выполнено для исполнителей и выброшено).
При работе с DataFrames вы всегда должны думать о том, как преобразовать один DF вдругой, использующий только API Spark для этого.Эти преобразования являются «инструкциями» для Spark, которые должны выполняться им распределенно.
В этом случае ваше требование может быть достигнуто с учетом этого.Вы хотите:
- Разобрать записей, то есть превратить каждую запись, содержащую массив, в несколько записей, каждая с одним элементом из массива
- Отслеживать позиция элемента массива в разобранном виде
- Concat , которая соответствует существующему значению столбца "id" с разделителем ":"
Каждое из этих действий может быть достигнуто с помощью одной из функций Spark , предназначенных для выполнения в столбцах DataFrame.Вот как выглядит решение:
import org.apache.spark.sql.functions._
import spark.implicits._
// Sample data
val tokenized = Seq(
(1, Array("Hi there", "Hello there")),
(2, Array("Bye now")),
(3, Array("Thank you", "Thanks", "Many thanks"))
).toDF("id", "sentences")
val result = tokenized
// we'll use posexplode function which creates "pos" and "col" columns
.select($"id", posexplode($"sentences"))
// we'll create a new docID column using concat function, and rename "col"
.select(concat($"id", lit(":"), $"pos") as "docID", $"col" as "sentence")
result.show()
// +-----+-----------+
// |docID| sentence|
// +-----+-----------+
// | 1:0| Hi there|
// | 1:1|Hello there|
// | 2:0| Bye now|
// | 3:0| Thank you|
// | 3:1| Thanks|
// | 3:2|Many thanks|
// +-----+-----------+