Обдумав комментарий Клемента, я нашел решение.
Прежде всего, я переключился между модулями 2 io.reactiverse
, выбрав не RxJava версию io.reactiverse.elasticsearch-client
.
Затем я вернулся к io.vertx.axle
версиям EventBus
и Message
.
Затем я изменил свой код следующим образом:
ElasticResource
import io.vertx.axle.core.eventbus.EventBus;
import io.vertx.core.json.JsonObject;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import java.util.concurrent.ExecutionException;
@Path("/elastic")
@ApplicationScoped
public class ElasticResource {
@Inject
EventBus eventBus;
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("bank-es")
public JsonObject greetingVertx() throws ExecutionException, InterruptedException {
JsonObject req = new JsonObject().put("index", "bank").put("id", "1");
return eventBus.<JsonObject>send("QuarkusElasticService.getReq", req)
.toCompletableFuture().get().body();
}
}
QuarkusElasticServiceImpl
import com.sourcesense.sisal.socialbetting.dev.example.elastic.service.QuarkusElasticService;
import io.quarkus.vertx.ConsumeEvent;
import io.reactiverse.elasticsearch.client.RestHighLevelClient;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import org.apache.http.HttpHost;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
public class QuarkusElasticServiceImpl implements QuarkusElasticService {
@Inject
Vertx vertx;
private RestHighLevelClient esClient;
@PostConstruct
public void init() {
RestClientBuilder builder = RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http"));
esClient = RestHighLevelClient.create(vertx, builder);
}
@Override
@ConsumeEvent("QuarkusElasticService.getReq")
public CompletionStage<JsonObject> getReq(JsonObject jsonObject) {
CompletableFuture future = new CompletableFuture();
GetRequest getRequest = new GetRequest(
jsonObject.getString("index"),
jsonObject.getString("id"));
esClient.getAsync(getRequest, RequestOptions.DEFAULT, ar -> {
if (ar.failed()) {
future.completeExceptionally(new Exception("erroraccio"));
} else {
future.complete(JsonObject.mapFrom(ar.result()));
}
});
return future;
}
}