Я пытаюсь объединить vertx с mysql, чтобы провести рефакторинг кода. сначала я использую vertx-jdb c -client, но tps кажется не очень хорошим, около 400 транзакций в секунду, кажется, что столько времени потрачено на ожидание, чтобы получить соединение sql, но я закрыл соединение после фиксации транзакции / rollback, то вместо этого я взял vertx- mysql -client, что похоже, около 480 транзакций в секунду, я много раз просматривал документацию на офисном сайте, следил за примерами кодов, пробовал так много вариантов настройки, так же как номер вертикального экземпляра, обработчик блокировки, blockingexecute, размер пула подключения, параметры TCP, собственный транспорт и т. д. c, но я не могу понять, почему так низкая производительность tps.
в отличие от vertx, я использую springboot webflux, jpa и Schedulers.parallel (), tps до 1300 в секунду, это так странно.
мой старый код использовал springboot undertow, servlet 3.1 и jpa, tps было довольно низким.
vertx qps будет до 27000 в секунду, если без mysql запроса или обработки в моем стресс-тесте.
VertxOptions vertxOptions = new VertxOptions().setPreferNativeTransport(true);
Vertx vertx = Vertx.vertx(vertxOptions);
vertx.registerVerticleFactory(springVerticleFactory);
DeploymentOptions deploymentOptions = new DeploymentOptions().setInstances(CpuCoreSensor.availableProcessors());
vertx.deployVerticle(SpringVerticleFactory.PREFIX + ":" + MainVerticle.class.getName(), deploymentOptions);
@Slf4j
@Component
@Scope(SCOPE_PROTOTYPE)
@RequiredArgsConstructor
public class MainVerticle extends AbstractVerticle {
@Value("${server.port:8080}")
private Integer port;
private JDBCClient jdbcClient;
public void start(Promise<Void> startFuture) throws Exception {
/*Map<String, Object> properties = Maps.newHashMap();
properties.put("maxLifetime", 1700000);
properties.put("cachePrepStmts", true);
properties.put("prepStmtCacheSize", 250);
properties.put("prepStmtCacheSqlLimit", 2048);
properties.put("useServerPrepStmts", true);
properties.put("useLocalSessionState", true);
properties.put("rewriteBatchedStatements", true);
properties.put("cacheResultSetMetadata", true);
properties.put("cacheServerConfiguration", true);
properties.put("elideSetAutoCommits", true);
properties.put("maintainTimeStats", false);
JsonObject config = new JsonObject()
.put("jdbcUrl", "jdbc:mysql://xxxxxx:3306/db?characterEncoding=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&serverTimezone=GMT%2B8&sslMode=DISABLED&allowPublicKeyRetrieval=true&rewriteBatchedStatements=true")
.put("provider_class", "io.vertx.ext.jdbc.spi.impl.HikariCPDataSourceProvider")
.put("driverClassName", "com.mysql.cj.jdbc.Driver")
.put("username", "user")
.put("password", "user")
.put("minimumIdle", 10)
.put("maximumPoolSize", 10)
.put("datasource", properties);
jdbcClient = JDBCClient.createShared(vertx, config);*/
MySQLConnectOptions connectOptions = new MySQLConnectOptions()
.setPort(3306)
.setHost("xxxxxxx")
.setDatabase("db")
.setUser("user")
.setPassword("user")
.setCachePreparedStatements(true)
.setPreparedStatementCacheMaxSize(250)
.setPreparedStatementCacheSqlLimit(2048)
.setTcpFastOpen(true)
.setTcpKeepAlive(true)
.setTcpNoDelay(true)
.setTcpQuickAck(true)
.setReusePort(true);
PoolOptions poolOptions = new PoolOptions().setMaxSize(10);
MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions);
Router router = Router.router(vertx);
router.route().method(HttpMethod.POST).method(HttpMethod.PUT).consumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE).handler(BodyHandler.create().setHandleFileUploads(false));
router.post("/api/test/post")
.handler(HTTPRequestValidationHandler.create()
.addFormParam("id", ParameterType.GENERIC_STRING, true)
.addFormParam("partition", ParameterType.GENERIC_STRING, true)
.addFormParam("code", ParameterType.GENERIC_STRING, true)
.addFormParam("amount", ParameterType.DOUBLE, true)
.addFormParam("num", ParameterType.GENERIC_STRING, true))
.handler(routingContext -> {
String id = routingContext.request().getFormAttribute("id");
String partition = routingContext.request().getFormAttribute("partition");
String code = routingContext.request().getFormAttribute("code");
BigDecimal amount = new BigDecimal(routingContext.request().getFormAttribute("amount"));
String num = routingContext.request().getFormAttribute("num");
client.begin(transactionAsyncResult -> {
if (transactionAsyncResult.failed()) {
routingContext.response().setStatusCode(500).end(Json.encode(TestResult.fail(Errors.OPERATE_FAILED)));
return;
}
Transaction tx = transactionAsyncResult.result();
tx.preparedQuery("select id from test_account where customer_id = ? and partition_id = ? and code = ? and user_type = ? and trans_type = ?")
.execute(Tuple.of(id, id, code, "xxx", "xxx"), rowSetAsyncResult -> {
if (rowSetAsyncResult.failed()) {
routingContext.response().setStatusCode(500).end(Json.encode(TestResult.fail(Errors.OPERATE_FAILED)));
return;
}
String id = rowSetAsyncResult.result().iterator().next().getString(0);
Tuple tuple = Tuple.of(amount, amount, id, amount);
tx.preparedQuery("UPDATE test_account SET available = available - ?, non_avalibale = non_avalibale + ? WHERE id = ? and available >= ?").execute(tuple, rowSetAsyncResult1 -> {
if (rowSetAsyncResult1.failed()) {
routingContext.response().setStatusCode(500).end(Json.encode(TestResult.fail(Errors.OPERATE_FAILED)));
return;
}
int rowcount = rowSetAsyncResult1.result().rowCount();
if (rowcount == 0) {
routingContext.response().setStatusCode(500).end(Json.encode(TestResult.fail(Errors.OPERATE_FAILED)));
return;
}
List<Tuple> jsonArrayList = Lists.newLinkedList();
Tuple jsonArray = getTuple();
jsonArrayList.add(jsonArray);
jsonArray = getTuple();
jsonArrayList.add(jsonArray);
tx.preparedQuery("INSERT INTO test_transaction_log (id, created_by, created_time, last_modified_by, last_modified_time, version, account_type, trans_type, qty_begin, qty_end, us_type, chg_type, code, cu_id, in_address, out_address, partition, record_type, remark, status, trans_amount, trans_number, tx_id, website, trans_type) " +
"VALUES (?, 'system', now(), 'system', now(), 1, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, ?, ?, ?, 'SUCCESS', ?, ?, NULL, 'zh', ?)").executeBatch(jsonArrayList, listAsyncResult -> {
if (listAsyncResult.failed()) {
tx.rollback(rollbackcontext -> {
routingContext.response().setStatusCode(500).end(Json.encode(TestResult.fail(Errors.OPERATE_FAILED)));
});
} else {
tx.commit(commitcontect -> {
routingContext.response().end(Json.encode(TestResult.SUCCESS));
});
}
});
});
});
});
/*jdbcClient.getConnection(result -> {
if (result.failed()) {
routingContext.response().setStatusCode(500).end(Json.encode(TestResult.fail(Errors.OPERATE_FAILED)));
return;
}
SQLConnection conn = result.result();
conn.setAutoCommit(false, voidAsyncResult -> {
if (voidAsyncResult.failed()) {
conn.close(h -> {
routingContext.response().setStatusCode(500).end(Json.encode(TestResult.fail(Errors.OPERATE_FAILED)));
});
return;
}
log.info("time spend -2 " + Thread.currentThread().getName() + "--" + (System.currentTimeMillis() - start));
JsonArray params = new JsonArray().add(id).add().).add(code).add("xxx").add("yyy");
conn.querySingleWithParams("select id from test_account where id = ? and partition_id = ? and code = ? and user_type = ? and trans_type = ?", params, jsonArrayAsyncResult -> {
if (jsonArrayAsyncResult.failed()) {
conn.close(h -> {
routingContext.response().setStatusCode(500).end(Json.encode(TestResult.fail(Errors.OPERATE_FAILED)));
});
return;
}
String id = jsonArrayAsyncResult.result().getString(0);
JsonArray uparams = new JsonArray().add(amount.doubleValue()).add(amount.doubleValue()).add(id).add(amount.doubleValue());
conn.updateWithParams("UPDATE test_account SET available = available - ?, non_avalibale = non_avalibale + ? WHERE id = ? and available >= ?", uparams, updateResultAsyncResult -> {
if (updateResultAsyncResult.failed()) {
conn.rollback(rollbackcontext -> {
conn.close(h -> {
routingContext.response().setStatusCode(500).end(Json.encode(TestResult.fail(Errors.OPERATE_FAILED)));
});
});
return;
}
log.info("time spend -4 " + Thread.currentThread().getName() + "--" + (System.currentTimeMillis() - start));
List<JsonArray> jsonArrayList = Lists.newLinkedList();
JsonArray jsonArray = getEntityArray();
jsonArrayList.add(jsonArray);
jsonArray = getEntityArray();
jsonArrayList.add(jsonArray);
conn.batchWithParams("INSERT INTO test_transaction_log (id, created_by, created_time, last_modified_by, last_modified_time, version, account_type, trans_type, qty_begin, qty_end, us_type, chg_type, code, cu_id, in_address, out_address, partition, record_type, remark, status, trans_amount, trans_number, tx_id, website, trans_type) " +
"VALUES (?, 'system', now(), 'system', now(), 1, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, ?, ?, ?, 'SUCCESS', ?, ?, NULL, 'zh', ?)", jsonArrayList, listAsyncResult -> {
if (listAsyncResult.failed()) {
conn.rollback(rollbackcontext -> {
conn.close(h -> {
routingContext.response().setStatusCode(500).end(Json.encode(TestResult.fail(Errors.OPERATE_FAILED)));
});
});
} else {
conn.commit(commitcontect -> {
conn.close(h -> {
routingContext.response().end(Json.encode(TestResult.SUCCESS));
});
});
}
});
});
});
});
});*/
});
router.route().failureHandler(routingContext -> {
log.warn("handler error", routingContext.failure());
routingContext.response().setStatusCode(500).end(Json.encode(Exceptions.map(routingContext.failure(), false)));
});
HttpServerOptions options = new HttpServerOptions()
.setTcpFastOpen(true).setTcpNoDelay(true)
.setTcpQuickAck(true).setReusePort(true);
vertx.createHttpServer(options).requestHandler(router).listen(port, http -> {
if (http.succeeded()) {
startFuture.complete();
} else {
log.error("http server start failed", http.cause());
startFuture.fail(http.cause());
}
});
}
}