Это кажется очень странной и специфической c проблемой, которая поставила меня в тупик.
При использовании DataFrame
, построенного по запросу spark.sql("select * from table")
к таблице Hive, я получаю исключение тайм-аута всякий раз, когда пытаюсь использовать HTTP-клиента в шаге преобразования или действия на этом DataFrame
.
Пример:
import scalaj.http._
import org.apache.spark.sql.SparkSession
object Example {
def postDoc(doc: String): Unit = {
val resp = Http("https://example.com/endpoint")
.postData(doc)
.header("content-type", "application/json")
.asString
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
import spark.implicits._
val df = spark.sql("select id, json_doc from some_table")
df.map(r => r.getAs[String]("json_doc")).foreach(postDoc _)
}
}
Однако я могу подключиться к службе через DataFrame
, созданный вручную; т.е. Seq((1, "{\"a\": 1}")).toDF("id", "json_doc").foreach(postDoc _)
.
Я также пытался создать временные таблицы и использовать spark.sql
для выбора из них; который работает на DataFrame
, который я создаю вручную, но не на тех, которые получены из таблицы Hive.
Мой частичный build.sbt
scalaVersion := "2.11.12"
libraryDependencies ++= {
val sparkVersion = "2.1.3"
Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.scalaj" %% "scalaj-http" % "2.4.2"
)
}
Stacktrace
java.net.SocketTimeoutException: Read timed out
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at sun.net.www.protocol.http.HttpURLConnection$10.run(HttpURLConnection.java:1950)
at sun.net.www.protocol.http.HttpURLConnection$10.run(HttpURLConnection.java:1945)
at java.security.AccessController.doPrivileged(Native Method)
at sun.net.www.protocol.http.HttpURLConnection.getChainedException(HttpURLConnection.java:1944)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1514)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1498)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:352)
at scalaj.http.HttpRequest.scalaj$http$HttpRequest$$doConnection(Http.scala:367)
at scalaj.http.HttpRequest.exec(Http.scala:343)
at scalaj.http.HttpRequest.asString(Http.scala:492)
at com.gm.avalanche.collect.Collector$.postDoc(Collector.scala:34)
at com.gm.avalanche.collect.Collector$$anonfun$sqlTest$2.apply(Collector.scala:71)
at com.gm.avalanche.collect.Collector$$anonfun$sqlTest$2.apply(Collector.scala:71)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1954)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1954)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.read(InputRecord.java:503)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:975)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:933)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:735)
at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1593)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1498)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.getInputStream(HttpsURLConnectionImpl.java:268)
at scalaj.http.HttpRequest.scalaj$http$HttpRequest$$doConnection(Http.scala:365)
... 17 more