Мы запускаем потоковое приложение Spark (версия Spark - 2.4.3) и извлекаем конфигурацию из базы данных postgres, используя запланированные потоки. Мы делаем это, используя spark
spark.read().format("jdbc").options(options).load()
Проблема в том, что мы замечаем, что в течение нескольких итераций запрос просто зависает. Ничего не происходит. Вызовы после этого все еще работают, и потоковые операции продолжаются. Я извлек дамп потока, и вот что я получил.
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
java.net.SocketInputStream.read(SocketInputStream.java:171)
java.net.SocketInputStream.read(SocketInputStream.java:141)
sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
sun.security.ssl.InputRecord.read(InputRecord.java:503)
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:975) => holding Monitor(java.lang.Object@1651260651})
sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:933)
sun.security.ssl.AppInputStream.read(AppInputStream.java:105) => holding Monitor(sun.security.ssl.AppInputStream@1497069430})
org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:143)
org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:112)
org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:71)
org.postgresql.core.PGStream.ReceiveChar(PGStream.java:282)
org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1803)
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:255) => holding Monitor(org.postgresql.core.v3.QueryExecutorImpl@2049145458})
org.postgresql.jdbc2.AbstractJdbc2Statement.execute(AbstractJdbc2Statement.java:570)
org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(AbstractJdbc2Statement.java:420)
org.postgresql.jdbc2.AbstractJdbc2Statement.executeQuery(AbstractJdbc2Statement.java:305)
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
org.apache.spark.scheduler.Task.run(Task.scala:121)
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
Есть идеи, почему это происходит?