Как накапливать результаты в Seq для последующей обработки в спарк? - PullRequest
0 голосов
/ 12 апреля 2019

Я пытаюсь обработать каждую строку в кадре данных Spark и преобразовать его в другой кадр данных.По сути, у меня есть кадр A, который содержит столбец («id») и другой столбец, который является массивом предложений.Я хотел бы преобразовать это в другой фрейм данных с каждым предложением, однозначно идентифицированным строкой идентификатора «docID: count».Мой код:

var sentencesCollection:Seq[SentenceIdentifier] = Seq()
tokenized.foreach(row => {
    val docID = row.getAs[String]("id")
    val sentences = row.getAs[Seq[String]]("sentences")
    var count:Integer = 0
    for (elem <- sentences) {
        val sentenceID:String = docID + ":" + count
        count = count + 1

        val si = SentenceIdentifier(sentenceID, elem)
        sentencesCollection = sentencesCollection :+ si
     }

})

println(sentencesCollection.length)

Однако оператор println печатает «0».

Любая идея, как я могу иметь предложенияКоллекция, последовательность, которую я могу обрабатывать дальше?(Возможно, думал, что вызов .toDF ()).

1 Ответ

5 голосов
/ 12 апреля 2019

Как @ Луис Мигель Мехиа Суарес очень хорошо объяснил в комментарии, любая функция, переданная в качестве аргумента DataFrame.foreach, будет выполняться на одном или нескольких компьютерах-исполнителях, а не на драйвере, выполняющем этот код,поэтому любое изменение изменчивого состояния будет потеряно (оно будет выполнено для исполнителей и выброшено).

При работе с DataFrames вы всегда должны думать о том, как преобразовать один DF вдругой, использующий только API Spark для этого.Эти преобразования являются «инструкциями» для Spark, которые должны выполняться им распределенно.

В этом случае ваше требование может быть достигнуто с учетом этого.Вы хотите:

  • Разобрать записей, то есть превратить каждую запись, содержащую массив, в несколько записей, каждая с одним элементом из массива
  • Отслеживать позиция элемента массива в разобранном виде
  • Concat , которая соответствует существующему значению столбца "id" с разделителем ":"

Каждое из этих действий может быть достигнуто с помощью одной из функций Spark , предназначенных для выполнения в столбцах DataFrame.Вот как выглядит решение:

import org.apache.spark.sql.functions._
import spark.implicits._

// Sample data
val tokenized = Seq(
  (1, Array("Hi there", "Hello there")),
  (2, Array("Bye now")),
  (3, Array("Thank you", "Thanks", "Many thanks"))
).toDF("id", "sentences")

val result = tokenized
   // we'll use posexplode function which creates "pos" and "col" columns
  .select($"id", posexplode($"sentences")) 
   // we'll create a new docID column using concat function, and rename "col"
  .select(concat($"id", lit(":"), $"pos") as "docID", $"col" as "sentence")

result.show()
// +-----+-----------+
// |docID|   sentence|
// +-----+-----------+
// |  1:0|   Hi there|
// |  1:1|Hello there|
// |  2:0|    Bye now|
// |  3:0|  Thank you|
// |  3:1|     Thanks|
// |  3:2|Many thanks|
// +-----+-----------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...