SQLException не распространяется на основной поток - Clojure - PullRequest
2 голосов
/ 28 января 2020

У меня проблема при выполнении запроса к Postgres. Если запрос занимает более 10 секунд или около того, возникает SQLException, как показано ниже

com.mchange.v2.c3p0.impl.NewPooledConnection@33d2d0dc handling a throwable.: org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:326)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:428)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:354)
at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:169)
at org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:117)
at com.mchange.v2.c3p0.impl.NewProxyPreparedStatement.executeQuery(NewProxyPreparedStatement.java:1418)
at clojure.java.jdbc$execute_query_with_params.invokeStatic(jdbc.clj:993)
at clojure.java.jdbc$execute_query_with_params.invoke(jdbc.clj:987)
at clojure.java.jdbc$db_query_with_resultset_STAR_.invokeStatic(jdbc.clj:1016)
at clojure.java.jdbc$db_query_with_resultset_STAR_.invoke(jdbc.clj:996)
at clojure.java.jdbc$reducible_query$reify__15393.reduce(jdbc.clj:1272)
at clojure.core$reduce.invokeStatic(core.clj:6544)
at clojure.core$reduce.invoke(core.clj:6527)
at cenx.constellation.common$reducible__GT_chan$fn__978.invoke(common.clj:103)
at clojure.core$binding_conveyor_fn$fn__4676.invoke(core.clj:1938)
at clojure.lang.AFn.call(AFn.java:18)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at org.postgresql.core.PGStream.receiveChar(PGStream.java:290)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1963)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:300)
... 19 more

, а сейчас я просто хочу иметь возможность уведомить мой главный поток о том, что что-то произошло. Мой блок кода выглядит следующим образом:

(defn func (
[coll name]
    (let [ch (async/chan 100)]
        (future (if (not (Thread/interrupted))
            (try
                (let [  _ (log/info "Start Reading from" name)
                        i (reduce (fn [i v]
                                    (when (zero? (mod i 10000))
                                        (log/debug "row[" i "]: " name))
                                    (if (nil? v)
                                        (log/warn "skipping")
                                    (async/>!! ch [i v]))
                                    (inc i))
                            0 coll)
                        _ (log/info "End Reading, read" i "rows from" name)]
                (async/close! ch))
            (catch SQLException se
                (log/info "SQLException " (.getMessage se))
                (throw (SQLException. "SQLException in my catch block")))
            (finally
                (log/info "Thread is not interrupted")))
        (throw (InterruptedException. "Thread got interrupted"))))
    ch)))

Хотя исключение перехватывается в рабочем потоке, основной поток не перехватывает исключение, которое я выбрасываю изнутри блока перехвата. Я новичок в clojure и буду признателен за любые комментарии по этому

РЕДАКТИРОВАТЬ: Я добавляю метод, который я использую для создания и настройки пула соединений

(defn- pool [{:keys [subprotocol host port db user pool] :as _spec}] {:datasource (doto (ComboPooledDataSource.) (.setJdbcUrl (str "jdbc:" subprotocol "://" host ":" port "/" db)) (.setUser user) (.setPassword (get-postgres-password)) (.setInitialPoolSize (:min-size pool)) (.setMinPoolSize (:min-size pool)) (.setMaxPoolSize (:max-size pool)) (.setMaxConnectionAge (* 1 60 60)) (.setMaxIdleTime (* 1/2 60 60)) (.setMaxIdleTimeExcessConnections (* 5 60)) (.setIdleConnectionTestPeriod (* 10 60)))})

см. здесь для существующих конфигураций.

Ответы [ 2 ]

1 голос
/ 28 января 2020

Чтобы получить любые внутренние исключения, добавьте ConnectionEventListener / StatementEventListener к вашему NewPooledConnection.

0 голосов
/ 30 января 2020

После еще нескольких исследований я понял, что при использовании блока future в Clojure все исключения проглатываются, если только future get rerefed . Таким образом, я мог решить мою проблему, обновив мой defn как ниже и разыменовав future в другом defn позже.

(defn func
([coll name]
(let [ch (async/chan 100)
future-f (future
               (let [i (reduce (fn [i v]
                                 (when (zero? (mod i 10000))
                                   (log/debug " row[" i "]: " name))

                                 (if (nil? v)
                                   (log/warn "skipping")
                                   (async/>!! ch [i v]))
                                 (inc i))
                               0 coll)
                     _ (log/info "End Reading, read" i "rows from" name)]
                 (async/close! ch)))]
 [ch future-f])))
...