Отправка HTTP-запросов на Spark с использованием foreachPartition - PullRequest
1 голос
/ 22 октября 2019

Нужна помощь, чтобы понять поведение нижеприведенного в Spark (с использованием Scala и Databricks)

У меня есть некоторый фрейм данных (чтение из S3, если это имеет значение), и я бы отправил эти данные, отправив HTTP-запросы на публикациюпартиями по 1000 (максимум). Поэтому я перераспределил фрейм данных, чтобы в каждом разделе было не более 1000 записей. Кроме того, для каждой строки создан столбец json (поэтому мне нужно только потом поместить их в массив)

Проблема в том, чтобы делать запросы. Я создал следующий класс Serializable, используя следующий код

import org.apache.spark.sql.{DataFrame, Row}
import org.apache.http.client.methods.HttpPost
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.http.HttpHeaders
import org.apache.http.entity.StringEntity
import org.apache.commons.io.IOUtils

object postObject extends Serializable{
  val client = HttpClientBuilder.create().build()
  val post = new HttpPost("https://my-cool-api-endpoint")
  post.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
  def makeHttpCall(row: Iterator[Row]) = {
      val json_str = """{"people": [""" + row.toSeq.map(x => x.getAs[String]("json")).mkString(",") + "]}"      
      post.setEntity(new StringEntity(json_str))
      val response = client.execute(post)
      val entity = response.getEntity()
      println(Seq(response.getStatusLine.getStatusCode(), response.getStatusLine.getReasonPhrase()))
      println(IOUtils.toString(entity.getContent()))
  }
}

Теперь, когда я попробую следующее:

postObject.makeHttpCall(data.head(2).toIterator)

Он работает как шарм. Запросы обрабатываются, на экране выводятся некоторые данные, и мой API получает эти данные.

Но когда я пытаюсь поместить их в раздел foreach:

data.foreachPartition { x => 
  postObject.makeHttpCall(x)
}

Ничего не происходит. Нет вывода на экран, ничего не приходит в моем API. Если я попытаюсь повторить его, почти все этапы просто пропускаются. Я считаю, что по какой-то причине мне просто лень оценивать мои запросы, а не выполнять их на самом деле. Я не понимаю, почему и как это заставить.

1 Ответ

1 голос
/ 27 октября 2019

postObject имеет 2 поля: client и post, которые должны быть сериализованы.

Я не уверен, что client правильно сериализовано. post объект потенциально видоизменен из нескольких разделов (на одном и том же работнике). Здесь многое может пойти не так.

Я предлагаю попробовать удалить postObject и вставить его тело в foreachPartition напрямую.

Добавление:

Попытался запустить его сам:

sc.parallelize((1 to 10).toList).foreachPartition(row => {
        val client = HttpClientBuilder.create().build()
        val post = new HttpPost("https://google.com")
        post.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
        val json_str = """{"people": [""" + row.toSeq.map(x => x.toString).mkString(",") + "]}"
        post.setEntity(new StringEntity(json_str))
        val response = client.execute(post)
        val entity = response.getEntity()
        println(Seq(response.getStatusLine.getStatusCode(), response.getStatusLine.getReasonPhrase()))
        println(IOUtils.toString(entity.getContent()))
      })

Запускал его как локально, так и в кластере. Он успешно завершается и печатает 405 ошибок в рабочие журналы. Поэтому запросы определенно попадают на сервер.

foreachPartition ничего не возвращает в результате. Для устранения проблемы вы можете изменить ее на mapPartitions:

val responseCodes = sc.parallelize((1 to 10).toList).mapPartitions(row => {
        val client = HttpClientBuilder.create().build()
        val post = new HttpPost("https://google.com")
        post.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
        val json_str = """{"people": [""" + row.toSeq.map(x => x.toString).mkString(",") + "]}"
        post.setEntity(new StringEntity(json_str))
        val response = client.execute(post)
        val entity = response.getEntity()
        println(Seq(response.getStatusLine.getStatusCode(), response.getStatusLine.getReasonPhrase()))
        println(IOUtils.toString(entity.getContent()))
        Iterator.single(response.getStatusLine.getStatusCode)
      }).collect()

println(responseCodes.mkString(", "))

Этот код возвращает список кодов ответов, чтобы вы могли проанализировать его. Для меня это печатает 405, 405, как и ожидалось.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...