Нужна помощь, чтобы понять поведение нижеприведенного в Spark (с использованием Scala и Databricks)
У меня есть некоторый фрейм данных (чтение из S3, если это имеет значение), и я бы отправил эти данные, отправив HTTP-запросы на публикациюпартиями по 1000 (максимум). Поэтому я перераспределил фрейм данных, чтобы в каждом разделе было не более 1000 записей. Кроме того, для каждой строки создан столбец json (поэтому мне нужно только потом поместить их в массив)
Проблема в том, чтобы делать запросы. Я создал следующий класс Serializable, используя следующий код
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.http.client.methods.HttpPost
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.http.HttpHeaders
import org.apache.http.entity.StringEntity
import org.apache.commons.io.IOUtils
object postObject extends Serializable{
val client = HttpClientBuilder.create().build()
val post = new HttpPost("https://my-cool-api-endpoint")
post.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
def makeHttpCall(row: Iterator[Row]) = {
val json_str = """{"people": [""" + row.toSeq.map(x => x.getAs[String]("json")).mkString(",") + "]}"
post.setEntity(new StringEntity(json_str))
val response = client.execute(post)
val entity = response.getEntity()
println(Seq(response.getStatusLine.getStatusCode(), response.getStatusLine.getReasonPhrase()))
println(IOUtils.toString(entity.getContent()))
}
}
Теперь, когда я попробую следующее:
postObject.makeHttpCall(data.head(2).toIterator)
Он работает как шарм. Запросы обрабатываются, на экране выводятся некоторые данные, и мой API получает эти данные.
Но когда я пытаюсь поместить их в раздел foreach:
data.foreachPartition { x =>
postObject.makeHttpCall(x)
}
Ничего не происходит. Нет вывода на экран, ничего не приходит в моем API. Если я попытаюсь повторить его, почти все этапы просто пропускаются. Я считаю, что по какой-то причине мне просто лень оценивать мои запросы, а не выполнять их на самом деле. Я не понимаю, почему и как это заставить.