Spark агрегатные строки с пользовательской функцией - PullRequest
0 голосов
/ 28 сентября 2018

Для простоты предположим, что у нас есть кадр данных, содержащий следующие данные:

+----------+---------+----------+----------+
|firstName |lastName |Phone     |Address   |
+----------+---------+----------+----------+
|firstName1|lastName1|info1     |info2     |
|firstName1|lastName1|myInfo1   |dummyInfo2|
|firstName1|lastName1|dummyInfo1|myInfo2   |
+----------+---------+----------+----------+

Как объединить все группировки строк по (firstName, lastName) и сохранить в столбцах данные только для телефона и адреса, начинающиеся с «my», чтобы получить следующее:

+----------+---------+----------+----------+
|firstName |lastName |Phone     |Address   |
+----------+---------+----------+----------+
|firstName1|lastName1|myInfo1   |myInfo2   |
+----------+---------+----------+----------+

Может, мне использовать функцию agg с пользовательским UDAF?Но как я могу это реализовать?

Примечание: я использую Spark 2.2 вместе со Scala 2.11

Спасибо за ваше время

Ответы [ 2 ]

0 голосов
/ 28 сентября 2018

Вы можете использовать groupBy и collect_set функцию агрегирования и использовать udf функцию для фильтрации первой строки, которая начинается с "my"

import org.apache.spark.sql.functions._
def myudf = udf((array: Seq[String]) => array.filter(_.startsWith("my")).head)

df.groupBy("firstName ", "lastName")
  .agg(myudf(collect_set("Phone")).as("Phone"), myudf(collect_set("Address")).as("Address"))
  .show(false)

, котораядолжен дать вам

+----------+---------+-------+-------+
|firstName |lastName |Phone  |Address|
+----------+---------+-------+-------+
|firstName1|lastName1|myInfo1|myInfo2|
+----------+---------+-------+-------+

Надеюсь, ответ будет полезным

0 голосов
/ 28 сентября 2018

Если задействованы только два столбца, вместо UDF можно использовать фильтрацию и объединение:

val df = List(
  ("firstName1", "lastName1", "info1", "info2"),
  ("firstName1", "lastName1", "myInfo1", "dummyInfo2"),
  ("firstName1", "lastName1", "dummyInfo1", "myInfo2")
).toDF("firstName", "lastName", "Phone", "Address")

val myPhonesDF = df.filter($"Phone".startsWith("my"))
val myAddressDF = df.filter($"Address".startsWith("my"))

val result = myPhonesDF.alias("Phones").join(myAddressDF.alias("Addresses"), Seq("firstName", "lastName"))
    .select("firstName", "lastName", "Phones.Phone", "Addresses.Address")
result.show(false)

Вывод:

+----------+---------+-------+-------+
|firstName |lastName |Phone  |Address|
+----------+---------+-------+-------+
|firstName1|lastName1|myInfo1|myInfo2|
+----------+---------+-------+-------+

Для многих столбцов, когда ожидается только одна строка, напримерМожно использовать конструкцию:

  val columnsForSearch = List("Phone", "Address")
  val minExpressions = columnsForSearch.map(c => min(when(col(c).startsWith("my"), col(c)).otherwise(null)).alias(c))
  df.groupBy("firstName", "lastName").agg(minExpressions.head, minExpressions.tail: _*)

Вывод одинаков.

UDF с двумя параметрами, например:

  val twoParamFunc = (firstName: String, Phone: String) => firstName + ": " + Phone
  val twoParamUDF = udf(twoParamFunc)
  df.select(twoParamUDF($"firstName", $"Phone")).show(false)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...