Как извлечь значения из RDD на основе переданного параметра - PullRequest
0 голосов
/ 28 мая 2018

Я создал ключ-значение RDD, но я не уверен, как выбрать из него значения.

val mapdf = merchantData_df.rdd.map(row => {
    val Merchant_Name = row.getString(0)
    val Display_Name = row.getString(1)
    val Store_ID_name = row.getString(2)
    val jsonString = s"{Display_Name: $Display_Name, Store_ID_name: $Store_ID_name}"
    (Merchant_Name, jsonString)
})

scala> mapdf.take(4).foreach(println)
(Amul,{Display_Name: Amul, Store_ID_name: null})
(Nestle,{Display_Name: Nestle, Store_ID_name: null})
(Ace,{Display_Name: Ace , Store_ID_name: null})
(Acme ,{Display_Name: Acme Fresh Market, Store_ID_name: Acme Markets})

Теперь предположим, что моя входная строка для функции будет Amul, Моя ожидаемаявывод для DisplayName is Amul и другая функция для StoreID to return NULL.

Как мне этого добиться?

Я не хочу использовать SparkSQL для этой цели

1 Ответ

0 голосов
/ 29 мая 2018

При заданном входном кадре данных как

+-----------------+-----------------+-------------+
|Merchant_Name    |Display_Name     |Store_ID_name|
+-----------------+-----------------+-------------+
|Fitch            |Fitch            |null         |
|Kids             |Kids             |null         |
|Ace Hardware     |Ace Hardware     |null         |
| Fresh Market    |Acme  Market     |Acme Markets |
|Adventure        | Island          |null         |
+-----------------+-----------------+-------------+

Вы можете написать функцию со строковым параметром как

import org.apache.spark.sql.functions._
def filterRowsWithKey(key: String) = df.filter(col("Merchant_Name") === key).select("Display_Name", "Store_ID_name")

И вызов функции как

filterRowsWithKey("Fitch").show(false)

дастВы

+------------+-------------+
|Display_Name|Store_ID_name|
+------------+-------------+
|Fitch       |null         |
+------------+-------------+

Я надеюсь, что ответ полезен

Обновлено

Если вы хотите, чтобы первая строка в виде строки была возвращена из функции, тогда выможет выполнить

import org.apache.spark.sql.functions._
def filterRowsWithKey(key: String) = df.filter(col("Merchant_Name") === key).select("Display_Name", "Store_ID_name").first().mkString(",")

println(filterRowsWithKey("Fitch"))

, что должно дать вам

Fitch,null

выше, функция вызовет исключение, если переданный ключ не найден, поэтому для безопасности вы можете использовать следующую функцию

import org.apache.spark.sql.functions._
def filterRowsWithKey(key: String) = {
  val filteredDF = df.filter(col("Merchant_Name") === key).select("Display_Name", "Store_ID_name")
  if(filteredDF.count() > 0) filteredDF.first().mkString(",") else "key not found"
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...