vertx с mysql имеют низкую производительность tps - PullRequest
0 голосов
/ 25 мая 2020

Я пытаюсь объединить vertx с mysql, чтобы провести рефакторинг кода. сначала я использую vertx-jdb c -client, но tps кажется не очень хорошим, около 400 транзакций в секунду, кажется, что столько времени потрачено на ожидание, чтобы получить соединение sql, но я закрыл соединение после фиксации транзакции / rollback, то вместо этого я взял vertx- mysql -client, что похоже, около 480 транзакций в секунду, я много раз просматривал документацию на офисном сайте, следил за примерами кодов, пробовал так много вариантов настройки, так же как номер вертикального экземпляра, обработчик блокировки, blockingexecute, размер пула подключения, параметры TCP, собственный транспорт и т. д. c, но я не могу понять, почему так низкая производительность tps.

в отличие от vertx, я использую springboot webflux, jpa и Schedulers.parallel (), tps до 1300 в секунду, это так странно.

мой старый код использовал springboot undertow, servlet 3.1 и jpa, tps было довольно низким.

vertx qps будет до 27000 в секунду, если без mysql запроса или обработки в моем стресс-тесте.

VertxOptions vertxOptions = new VertxOptions().setPreferNativeTransport(true);
Vertx vertx = Vertx.vertx(vertxOptions);
vertx.registerVerticleFactory(springVerticleFactory);

DeploymentOptions deploymentOptions = new DeploymentOptions().setInstances(CpuCoreSensor.availableProcessors());
vertx.deployVerticle(SpringVerticleFactory.PREFIX + ":" + MainVerticle.class.getName(), deploymentOptions);

@Slf4j
@Component
@Scope(SCOPE_PROTOTYPE)
@RequiredArgsConstructor
public class MainVerticle extends AbstractVerticle {
    @Value("${server.port:8080}")
    private Integer port;
    private JDBCClient jdbcClient;

    public void start(Promise<Void> startFuture) throws Exception {
        /*Map<String, Object> properties = Maps.newHashMap();
        properties.put("maxLifetime", 1700000);
        properties.put("cachePrepStmts", true);
        properties.put("prepStmtCacheSize", 250);
        properties.put("prepStmtCacheSqlLimit", 2048);
        properties.put("useServerPrepStmts", true);
        properties.put("useLocalSessionState", true);
        properties.put("rewriteBatchedStatements", true);
        properties.put("cacheResultSetMetadata", true);
        properties.put("cacheServerConfiguration", true);
        properties.put("elideSetAutoCommits", true);
        properties.put("maintainTimeStats", false);

        JsonObject config = new JsonObject()
                .put("jdbcUrl", "jdbc:mysql://xxxxxx:3306/db?characterEncoding=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&serverTimezone=GMT%2B8&sslMode=DISABLED&allowPublicKeyRetrieval=true&rewriteBatchedStatements=true")
                .put("provider_class", "io.vertx.ext.jdbc.spi.impl.HikariCPDataSourceProvider")
                .put("driverClassName", "com.mysql.cj.jdbc.Driver")
                .put("username", "user")
                .put("password", "user")
                .put("minimumIdle", 10)
                .put("maximumPoolSize", 10)
                .put("datasource", properties);
        jdbcClient = JDBCClient.createShared(vertx, config);*/

        MySQLConnectOptions connectOptions = new MySQLConnectOptions()
                .setPort(3306)
                .setHost("xxxxxxx")
                .setDatabase("db")
                .setUser("user")
                .setPassword("user")
                .setCachePreparedStatements(true)
                .setPreparedStatementCacheMaxSize(250)
                .setPreparedStatementCacheSqlLimit(2048)
                .setTcpFastOpen(true)
                .setTcpKeepAlive(true)
                .setTcpNoDelay(true)
                .setTcpQuickAck(true)
                .setReusePort(true);
        PoolOptions poolOptions = new PoolOptions().setMaxSize(10);
        MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions);

        Router router = Router.router(vertx);
        router.route().method(HttpMethod.POST).method(HttpMethod.PUT).consumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE).handler(BodyHandler.create().setHandleFileUploads(false));

        router.post("/api/test/post")
                .handler(HTTPRequestValidationHandler.create()
                        .addFormParam("id", ParameterType.GENERIC_STRING, true)
                        .addFormParam("partition", ParameterType.GENERIC_STRING, true)
                        .addFormParam("code", ParameterType.GENERIC_STRING, true)
                        .addFormParam("amount", ParameterType.DOUBLE, true)
                        .addFormParam("num", ParameterType.GENERIC_STRING, true))
                .handler(routingContext -> {
                    String id = routingContext.request().getFormAttribute("id");
                    String partition = routingContext.request().getFormAttribute("partition");
                    String code = routingContext.request().getFormAttribute("code");
                    BigDecimal amount = new BigDecimal(routingContext.request().getFormAttribute("amount"));
                    String num = routingContext.request().getFormAttribute("num");

                    client.begin(transactionAsyncResult -> {
                        if (transactionAsyncResult.failed()) {
                            routingContext.response().setStatusCode(500).end(Json.encode(TestResult.fail(Errors.OPERATE_FAILED)));
                            return;
                        }

                        Transaction tx = transactionAsyncResult.result();
                        tx.preparedQuery("select id from test_account where customer_id = ? and partition_id = ? and code = ? and user_type = ? and trans_type = ?")
                                .execute(Tuple.of(id, id, code, "xxx", "xxx"), rowSetAsyncResult -> {
                                    if (rowSetAsyncResult.failed()) {
                                        routingContext.response().setStatusCode(500).end(Json.encode(TestResult.fail(Errors.OPERATE_FAILED)));
                                        return;
                                    }

                                    String id = rowSetAsyncResult.result().iterator().next().getString(0);
                                    Tuple tuple = Tuple.of(amount, amount, id, amount);
                                    tx.preparedQuery("UPDATE test_account SET available = available - ?, non_avalibale = non_avalibale + ? WHERE id = ? and available >= ?").execute(tuple, rowSetAsyncResult1 -> {
                                        if (rowSetAsyncResult1.failed()) {
                                            routingContext.response().setStatusCode(500).end(Json.encode(TestResult.fail(Errors.OPERATE_FAILED)));
                                            return;
                                        }
                                        int rowcount = rowSetAsyncResult1.result().rowCount();
                                        if (rowcount == 0) {
                                            routingContext.response().setStatusCode(500).end(Json.encode(TestResult.fail(Errors.OPERATE_FAILED)));                                                                                              
                                            return;
                                        }

                                        List<Tuple> jsonArrayList = Lists.newLinkedList();
                                        Tuple jsonArray = getTuple();
                                        jsonArrayList.add(jsonArray);
                                        jsonArray = getTuple();
                                        jsonArrayList.add(jsonArray);
                                        tx.preparedQuery("INSERT INTO test_transaction_log (id, created_by, created_time, last_modified_by, last_modified_time, version, account_type, trans_type, qty_begin, qty_end, us_type, chg_type, code, cu_id, in_address, out_address, partition, record_type, remark, status, trans_amount, trans_number, tx_id, website, trans_type) " +
                                                "VALUES (?, 'system', now(), 'system', now(), 1, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, ?, ?, ?, 'SUCCESS', ?, ?, NULL, 'zh', ?)").executeBatch(jsonArrayList, listAsyncResult -> {
                                            if (listAsyncResult.failed()) {
                                                tx.rollback(rollbackcontext -> {
                                                    routingContext.response().setStatusCode(500).end(Json.encode(TestResult.fail(Errors.OPERATE_FAILED)));
                                                });
                                            } else {
                                                tx.commit(commitcontect -> {
                                                    routingContext.response().end(Json.encode(TestResult.SUCCESS));
                                                });
                                            }
                                        });
                                    });
                                });
                    });

                    /*jdbcClient.getConnection(result -> {
                        if (result.failed()) {
                            routingContext.response().setStatusCode(500).end(Json.encode(TestResult.fail(Errors.OPERATE_FAILED)));
                            return;
                        }

                        SQLConnection conn = result.result();
                        conn.setAutoCommit(false, voidAsyncResult -> {
                            if (voidAsyncResult.failed()) {
                                conn.close(h -> {
                                    routingContext.response().setStatusCode(500).end(Json.encode(TestResult.fail(Errors.OPERATE_FAILED)));
                                });
                                return;
                            }

                            log.info("time spend -2 " + Thread.currentThread().getName() + "--" + (System.currentTimeMillis() - start));
                            JsonArray params = new JsonArray().add(id).add().).add(code).add("xxx").add("yyy");
                            conn.querySingleWithParams("select id from test_account where id = ? and partition_id = ? and code = ? and user_type = ? and trans_type = ?", params, jsonArrayAsyncResult -> {
                                if (jsonArrayAsyncResult.failed()) {
                                    conn.close(h -> {
                                        routingContext.response().setStatusCode(500).end(Json.encode(TestResult.fail(Errors.OPERATE_FAILED)));
                                    });
                                    return;
                                }
                                String id = jsonArrayAsyncResult.result().getString(0);
                                JsonArray uparams = new JsonArray().add(amount.doubleValue()).add(amount.doubleValue()).add(id).add(amount.doubleValue());
                                conn.updateWithParams("UPDATE test_account SET available = available - ?, non_avalibale = non_avalibale + ? WHERE id = ? and available >= ?", uparams, updateResultAsyncResult -> {
                                    if (updateResultAsyncResult.failed()) {
                                        conn.rollback(rollbackcontext -> {
                                            conn.close(h -> {
                                                routingContext.response().setStatusCode(500).end(Json.encode(TestResult.fail(Errors.OPERATE_FAILED)));
                                            });
                                        });
                                        return;
                                    }

                                    log.info("time spend -4 " + Thread.currentThread().getName() + "--" + (System.currentTimeMillis() - start));
                                    List<JsonArray> jsonArrayList = Lists.newLinkedList();
                                    JsonArray jsonArray = getEntityArray();
                                    jsonArrayList.add(jsonArray);
                                    jsonArray = getEntityArray();
                                    jsonArrayList.add(jsonArray);

                                    conn.batchWithParams("INSERT INTO test_transaction_log (id, created_by, created_time, last_modified_by, last_modified_time, version, account_type, trans_type, qty_begin, qty_end, us_type, chg_type, code, cu_id, in_address, out_address, partition, record_type, remark, status, trans_amount, trans_number, tx_id, website, trans_type) " +
                                            "VALUES (?, 'system', now(), 'system', now(), 1, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, ?, ?, ?, 'SUCCESS', ?, ?, NULL, 'zh', ?)", jsonArrayList, listAsyncResult -> {
                                        if (listAsyncResult.failed()) {
                                            conn.rollback(rollbackcontext -> {
                                                conn.close(h -> {
                                                    routingContext.response().setStatusCode(500).end(Json.encode(TestResult.fail(Errors.OPERATE_FAILED)));
                                                });
                                            });
                                        } else {
                                            conn.commit(commitcontect -> {
                                                conn.close(h -> {
                                                    routingContext.response().end(Json.encode(TestResult.SUCCESS));
                                                });
                                            });
                                        }
                                    });
                                });
                            });
                        });
                    });*/
                });

        router.route().failureHandler(routingContext -> {
            log.warn("handler error", routingContext.failure());
            routingContext.response().setStatusCode(500).end(Json.encode(Exceptions.map(routingContext.failure(), false)));
        });

        HttpServerOptions options = new HttpServerOptions()
                .setTcpFastOpen(true).setTcpNoDelay(true)
                .setTcpQuickAck(true).setReusePort(true);
        vertx.createHttpServer(options).requestHandler(router).listen(port, http -> {
            if (http.succeeded()) {
                startFuture.complete();
            } else {
                log.error("http server start failed", http.cause());
                startFuture.fail(http.cause());
            }
        });
    }
}

1 Ответ

0 голосов
/ 27 мая 2020

Это не окончательное решение, но есть две проблемы c, которые я вижу в этом коде:

  1. Пул соединений слишком мал
  2. Слишком много работы сделано EventL oop

Для первой проблемы обратите внимание, что вы выполняете три отдельные операции БД: выбор, обновление и вставку, используя одну и ту же транзакцию, то есть одно и то же соединение. Это одно из возможных узких мест, с которыми вы столкнетесь.

Что касается второй проблемы, я бы посоветовал разбить это как минимум на 3 отдельные статьи, обменивающиеся данными по шине событий.

  1. RequestHandler
  2. Вертикаль для выбора правильной строки
  3. Вертикаль для выполнения операций с этой строкой

Это также должно позволить вам лучше чередовать работу, произведенную по разным запросам.

...