У меня есть искровой фрейм с 40+ столбцами. и миллионы строк.
Я хочу создать еще один столбец, в котором, скажем, 5 столбцов из указанного выше кадра данных, передать каждую строку из 5 столбцов в отдельный Api (который принимает эти 5 значений и возвращает некоторые данные) и сохранить результат в столбце.
Для простоты я использую следующий пример:
Скажем, у меня есть следующий фрейм данных. И я хочу отправить каждую строку «food» и «price» в API, который возвращает результат, и он хранится в отдельном столбце под названием «объединить»
Введите:
+----+------+-----+
|name|food |price|
+----+------+-----+
|john|tomato|1.99 |
|john|carrot|0.45 |
|bill|apple |0.99 |
|john|banana|1.29 |
|bill|taco |2.59 |
+----+------+-----+
Выход:
+----+------+-----+----------+
|name|food |price|combined |
+----+------+-----+----------+
|john|tomato|1.99 |abcd |
|john|carrot|0.45 |fdg |
|bill|apple |0.99 |123fgfg |
|john|banana|1.29 |fgfg4wf |
|bill|taco |2.59 |gfg45gn |
+----+------+-----+----------+
Я создал UDF для просмотра каждой строки:
val zip = udf {
(food: String, price: Double) =>
val nvIn = new NameValue
nvIn.put("Query.ID", 1234)
nvIn.put("Food", food)
nvIn.put("Price", price)
val nvOut = new NameValue
val code: Code = getTunnelsClient().execute("CombineData", nvIn, nvOut) // this is calling the external API
nvOut.get("CombineData") //this is stored the result column
}
def test(sc: SparkContext, sqlContext: SQLContext): Unit = {
import sqlContext.implicits._
val df = Seq(
("john", "tomato", 1.99),
("john", "carrot", 0.45),
("bill", "apple", 0.99),
("john", "banana", 1.29),
("bill", "taco", 2.59)
).toDF("name", "food", "price")
val result = df.withColumn("combined", zip($"food", $"price"))
result.show(false)
}
Этот метод работает, однако я обеспокоен, так как я смотрю на каждую строку кадра данных, и у меня есть миллионы таких строк, он не будет столь же производительным в кластере
Есть ли другой способ сделать это (скажем, используя spark-sql), возможно, без использования udf?