Отправьте несколько столбцов в Spark Dataframe на внешний API и сохраните результат в отдельном столбце. - PullRequest
0 голосов
/ 12 марта 2019

У меня есть искровой фрейм с 40+ столбцами. и миллионы строк. Я хочу создать еще один столбец, в котором, скажем, 5 столбцов из указанного выше кадра данных, передать каждую строку из 5 столбцов в отдельный Api (который принимает эти 5 значений и возвращает некоторые данные) и сохранить результат в столбце.

Для простоты я использую следующий пример: Скажем, у меня есть следующий фрейм данных. И я хочу отправить каждую строку «food» и «price» в API, который возвращает результат, и он хранится в отдельном столбце под названием «объединить»

Введите:

+----+------+-----+
|name|food  |price|
+----+------+-----+
|john|tomato|1.99 |
|john|carrot|0.45 |
|bill|apple |0.99 |
|john|banana|1.29 |
|bill|taco  |2.59 |
+----+------+-----+

Выход:

+----+------+-----+----------+
|name|food  |price|combined  |
+----+------+-----+----------+
|john|tomato|1.99 |abcd      |
|john|carrot|0.45 |fdg       |
|bill|apple |0.99 |123fgfg   |
|john|banana|1.29 |fgfg4wf   |
|bill|taco  |2.59 |gfg45gn   |
+----+------+-----+----------+

Я создал UDF для просмотра каждой строки:

val zip = udf {
(food: String, price: Double) =>
    val nvIn = new NameValue
    nvIn.put("Query.ID", 1234)
    nvIn.put("Food", food)
    nvIn.put("Price", price)
    val nvOut = new NameValue

    val code: Code = getTunnelsClient().execute("CombineData", nvIn, nvOut) // this is calling the external API
    nvOut.get("CombineData")     //this is stored the result column
  }

  def test(sc: SparkContext, sqlContext: SQLContext): Unit = {
    import sqlContext.implicits._
    val df = Seq(
      ("john", "tomato", 1.99),
      ("john", "carrot", 0.45),
      ("bill", "apple", 0.99),
      ("john", "banana", 1.29),
      ("bill", "taco", 2.59)
    ).toDF("name", "food", "price")


    val result = df.withColumn("combined", zip($"food", $"price"))
    result.show(false)

  }

Этот метод работает, однако я обеспокоен, так как я смотрю на каждую строку кадра данных, и у меня есть миллионы таких строк, он не будет столь же производительным в кластере

Есть ли другой способ сделать это (скажем, используя spark-sql), возможно, без использования udf?

1 Ответ

0 голосов
/ 12 марта 2019

Я бы настоятельно рекомендовал использовать тип safe spark Dataset api для отправки ваших строк данных в api.

Это включает в себя синтаксический анализ Dataframe строк в scala case class с использованием функции as, а затем выполнение функции map на вашем Dataset\Dataframe для отправки его в API.и верните еще один case class, представляющий ваш Output.

Хотя строго spark sql с использованием API Dataset все же позволяет вам воспользоваться большинством оптимизаций, доступных в spark sql

case class Input(name: String, food: String, price: Double)
case class Output(name: String, food: String, price: Double, combined: String)

val df = Seq(
  ("john", "tomato", 1.99),
  ("john", "carrot", 0.45),
  ("bill", "apple", 0.99),
  ("john", "banana", 1.29),
  ("bill", "taco", 2.59)
).toDF("name", "food", "price")

df.as[Input].map(input => {
    val nvIn = new NameValue
    nvIn.put("Query.ID", 1234)
    nvIn.put("Food", input.food)
    nvIn.put("Price", input.price)
    val nvOut = new NameValue
    getTunnelsClient().execute("CombineData", nvIn, nvOut)
    Output(input.name, input.food, input.price, nvOut.get("CombineData"))
}).show(false)
...