Может ли кто-нибудь помочь мне исправить эту программу: ReqlDriverError: насос ответа закрыт - PullRequest
1 голос
/ 30 мая 2020

Я изучаю rethinkdb и получаю следующую ошибку. подключаю rethinkdb, подписываюсь на changefeed success. После того, как я запустил свое приложение, я мог добавить запись в таблицу success, но через 15/20 минут, если я добавлю запись, выйдет ошибка

c.r.g.e.ReqlDriverError: Response pump closed.
    at c.r.n.DefaultConnectionFactory$ThreadResponsePump.await(DefaultConnectionFactory.java:214)
    at c.r.net.Connection.sendQuery(Connection.java:349)
    at c.r.net.Connection.runQuery(Connection.java:384)
    at c.r.net.Connection.runAsync(Connection.java:166)
    at c.r.net.Connection.run(Connection.java:185). 

Мой код здесь:

@Override
public void run () {
    while (true) {
        try {
            logger.info ("START RETHINKDB & CHANGEFEED");
            conn = r.connection ()
                    .hostname (configParam.hostRethink)
                    .port (configParam.portRethink)
                    .db (configParam.dbRethink)
                    .user (configParam.userRethink, configParam.passRethink)
                    .connect ();

            // Changefeeds: subscribe to a feed by calling changes on a table
            Result <Object> result = r.table ("order")
                    .filter (
                            new ReqlFunction1 () {
                                @Override
                                public Object apply (ReqlExpr row) {
                                    return row.g ("status"). eq ("APPROVED");
                                }
                            }). changes (). optArg ("include_types", true) .run (conn);
            for (Object change: result) {
                logger.info (change.toString ());
                superMarketService.updateApprovedSuperMarketOrder (change);
            }
            logger.info ("END RETHINKDB & CHANGEFEED SUCCESS");
        } catch (Exception e) {
            e.printStackTrace ();
        }
    }
}

public void addOrder (JSONSuperMarket json) {
    if (conn == null) {
        logger.error ("=============== RETHINK DIE =========");
    }

    Result <Object> result = r.table ("order"). Insert (r.array (
            r.hashMap ("order_code", json.getOrderCode ())
                    .with ("cash_id", json.getCashId ())
                    .with ("merchant_id", json.getMerchantId ())
                    .with ("amount", json.getAmount ())
                    .with ("status", "PENDING")
                    .with ("description", json.getDescription ())
                    .with ("created_date", r.now (). inTimezone ("+ 07:00"). toIso8601 ())
    )). run (conn);
    logger.info (result.toString ());
}

Есть ли ошибка в моем коде, пожалуйста, помогите мне :( Я использую Java 8 + RethinkDb 2.4.2.

...