Тайм-аут клиента Http при использовании DataFrame, возвращенного из запроса Hive - PullRequest
0 голосов
/ 08 января 2020

Это кажется очень странной и специфической c проблемой, которая поставила меня в тупик.

При использовании DataFrame, построенного по запросу spark.sql("select * from table") к таблице Hive, я получаю исключение тайм-аута всякий раз, когда пытаюсь использовать HTTP-клиента в шаге преобразования или действия на этом DataFrame.

Пример:

import scalaj.http._
import org.apache.spark.sql.SparkSession

object Example {

  def postDoc(doc: String): Unit = {
    val resp = Http("https://example.com/endpoint")
      .postData(doc)
      .header("content-type", "application/json")
      .asString
   }

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
    import spark.implicits._
    val df = spark.sql("select id, json_doc from some_table")
    df.map(r => r.getAs[String]("json_doc")).foreach(postDoc _)
  }
}

Однако я могу подключиться к службе через DataFrame, созданный вручную; т.е. Seq((1, "{\"a\": 1}")).toDF("id", "json_doc").foreach(postDoc _).

Я также пытался создать временные таблицы и использовать spark.sql для выбора из них; который работает на DataFrame, который я создаю вручную, но не на тех, которые получены из таблицы Hive.

Мой частичный build.sbt

scalaVersion := "2.11.12"

libraryDependencies ++= {
  val sparkVersion =  "2.1.3"
  Seq(
    "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
    "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
    "org.scalaj" %% "scalaj-http" % "2.4.2"
  )
}

Stacktrace

java.net.SocketTimeoutException: Read timed out
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at sun.net.www.protocol.http.HttpURLConnection$10.run(HttpURLConnection.java:1950)
        at sun.net.www.protocol.http.HttpURLConnection$10.run(HttpURLConnection.java:1945)
        at java.security.AccessController.doPrivileged(Native Method)
        at sun.net.www.protocol.http.HttpURLConnection.getChainedException(HttpURLConnection.java:1944)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1514)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1498)
        at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
        at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:352)
        at scalaj.http.HttpRequest.scalaj$http$HttpRequest$$doConnection(Http.scala:367)
        at scalaj.http.HttpRequest.exec(Http.scala:343)
        at scalaj.http.HttpRequest.asString(Http.scala:492)
        at com.gm.avalanche.collect.Collector$.postDoc(Collector.scala:34)
        at com.gm.avalanche.collect.Collector$$anonfun$sqlTest$2.apply(Collector.scala:71)
        at com.gm.avalanche.collect.Collector$$anonfun$sqlTest$2.apply(Collector.scala:71)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1954)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1954)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:100)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: Read timed out
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
        at java.net.SocketInputStream.read(SocketInputStream.java:171)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
        at sun.security.ssl.InputRecord.read(InputRecord.java:503)
        at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:975)
        at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:933)
        at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
        at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:735)
        at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1593)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1498)
        at sun.net.www.protocol.https.HttpsURLConnectionImpl.getInputStream(HttpsURLConnectionImpl.java:268)
        at scalaj.http.HttpRequest.scalaj$http$HttpRequest$$doConnection(Http.scala:365)
        ... 17 more

1 Ответ

0 голосов
/ 10 января 2020

Оказывается, что сокет закрывался удаленно контроллером Ingress, работающим перед средой Kubernates, в которой работает экземпляр Elasticsearch. Он был установлен по умолчанию на время ожидания в одну минуту.

...