Я новичок в Spark, и у меня есть Apache SparkSQL DataFrame df
с 4 столбцами, имеющий следующую схему:
root
|-- _id: string (nullable = false)
|-- _title: string (nullable = false)
|-- _published-at: date (nullable = false)
|-- p: array (nullable = true)
| |-- element: string (containsNull = true)
df
содержит много (около миллиона) новостных статей со столбцамисодержит для каждой записи: уникальный идентификатор (_id), заголовок (_title), дату публикации (_published-at) и строковый массив текста параграфов в каждой статье (p).
Теперь я хотел бы преобразовать столбец "p" из его текущего формата Array[String]
абзацев статьи в объединенный String
полного текста статьи, где преобразование представляет собой простое отображение, в котором элементы абзаца объединяются с пробелом.("") между ними, в результате чего новый пятый столбец String
добавляется к df
.Т.е. как то так:
df.withColumn(df.(col"p").map(_.mkString(" ")).alias("fullarticle"))
который не работает.Однако это кажется тривиальной проблемой, но я, должно быть, что-то не так понял.В пакете Spark functions
можно найти много функций, но ни одна из них здесь не подходит.Должен ли я как-то использовать «Пользовательские функции» (UDF)?Лучше всего избежать этого, если это возможно.
Можно преобразовать это в String
, что приведет к новому Dataset[String] dsFullArticles
, выполнив:
dsFullArticles = df.select(col("p").as[Array[String]]).map(_.mkString(" ")).alias("fullarticle")
(.as[Array[String]]
кажется, что необходимо развернуть WrappedArray
, который фактически оборачивает каждый элемент Array[String]
в столбце "p").Но как вместо этого добавить dsFullArticles
как новый столбец к df
?
После этого я также хотел бы найти длину самого длинного слова для каждой статьи в "полной статье""и добавьте его как шестой столбец к df
:
// Split each article in an array of its words
val dsFullArticlesArrayOfWords = dsFullArticles.map(s => s.split(" "))
// Find number of characters of longest word in article, 0 if article is empty
val dsMaxWordLength =
dsFullArticlesArrayOfWords.map(s => (s.map(w => w.length()) match {
case x if x.isEmpty => 0
case x => x.max
}))
Вышеприведенный код также работает, создавая Dataset[int]
, но как, аналогично, добавить его как столбец к df
?Та же проблема здесь.При наличии всех в одном DataFrame df
было бы легко делать различные выборки SQL, фильтрацию и т. Д.