Spark Отправить DataFrame как тело HTTP-запроса Post - PullRequest
0 голосов
/ 02 мая 2018

У меня есть фрейм данных, который я хочу отправить как тело запроса HTTP Post, какой лучший Sparky способ сделать это?
Как я могу контролировать количество HTTP-запросов? Если количество записей становится больше, есть ли способ разделить отправляющий фрейм данных на несколько вызовов HTTP Post?

скажем, мой фрейм данных выглядит так:

+--------------------------------------+------------+------------+------------------+
|               user_id                |    city    | user_name  |   facebook_id    |
+--------------------------------------+------------+------------+------------------+
| 55c3c59d-0163-46a2-b495-bc352a8de883 | Toronto    | username_x | 0123482174440907 |
| e2ddv22d-4132-c211-4425-9933aa8de454 | Washington | username_y | 0432982476780234 |
+--------------------------------------+------------+------------+------------------+

Я хочу иметь user_id и facebook_id в теле HTTP-запроса Post к этой конечной точке localhost:8080/api/spark

1 Ответ

0 голосов
/ 02 мая 2018

Вы можете достичь этого, используя метод foreachPartition на Dataframe. Здесь я предполагаю, что вы хотите сделать вызов Http для каждой строки в кадре данных параллельно. foreachPartition работает на каждом разделе Dataframe параллельно. Если вы хотите объединить несколько строк в один пост-вызов HTTP, это также возможно, изменив сигнатуру метода makeHttpCall с Row на Iterator[Row]

  def test(): Unit = {
    val df: DataFrame = null
    df.foreachPartition(_.foreach(x => makeHttpCall(x)))
  }

  def makeHttpCall(row: Row) = {
    val json = Json.obj("user_name" -> row.getString(2), "facebook_id" -> row.getString(3))
    /**
      * code make Http call
      */
  }

для массового запроса Http makeHttpCall. убедитесь, что у вас есть достаточное количество разделов в фрейме данных, чтобы каждый раздел был достаточно маленьким для выполнения запроса Http Post.

import org.apache.spark.sql.{DataFrame, Row}
import play.api.libs.json.Json

  def test(): Unit = {
    val df: DataFrame = null
    df.foreachPartition(x => makeHttpCall(x))
  }

  def makeHttpCall(row: Iterator[Row]) = {
    val json = Json.arr(row.toSeq.map(x => Json.obj("user_name" -> x.getString(2), "facebook_id" -> x.getString(3))))
    /**
      * code make Http call
      */
  }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...