Я разрабатываю прототип для нового проекта. Идея заключается в предоставлении микросервиса Reactive Spring Boot для массового индексирования документов в Elasticsearch. Elasticsearch предоставляет High Level Rest Client, который предоставляет метод Asyn c для массовой обработки запросов на индексирование. Asyn c доставляет обратные вызовы с использованием слушателей, упомянутых здесь . Обратные вызовы получают индексные ответы (по запросам) в пакетном режиме. Я пытаюсь отправить этот ответ клиенту как Flux. Я придумал что-то основанное на этом посте в блоге .
Контроллер
@RestController
public class AppController {
@SuppressWarnings("unchecked")
@RequestMapping(value = "/test3", method = RequestMethod.GET)
public Flux<String> index3() {
ElasticAdapter es = new ElasticAdapter();
JSONObject json = new JSONObject();
json.put("TestDoc", "Stack123");
Flux<String> fluxResponse = es.bulkIndex(json);
return fluxResponse;
}
ElasticAdapter
@Component
class ElasticAdapter {
String indexName = "test2";
private final RestHighLevelClient client;
private final ObjectMapper mapper;
private int processed = 1;
Flux<String> bulkIndex(JSONObject doc) {
return bulkIndexDoc(doc)
.doOnError(e -> System.out.print("Unable to index {}" + doc+ e));
}
private Flux<String> bulkIndexDoc(JSONObject doc) {
return Flux.create(sink -> {
try {
doBulkIndex(doc, bulkListenerToSink(sink));
} catch (JsonProcessingException e) {
sink.error(e);
}
});
}
private void doBulkIndex(JSONObject doc, BulkProcessor.Listener listener) throws JsonProcessingException {
System.out.println("Going to submit index request");
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
BulkProcessor.Builder builder =
BulkProcessor.builder(bulkConsumer, listener);
builder.setBulkActions(10);
BulkProcessor bulkProcessor = builder.build();
// Submitting 5,000 index requests ( repeating same JSON)
for (int i = 0; i < 5000; i++) {
IndexRequest indexRequest = new IndexRequest(indexName, "person", i+1+"");
String json = doc.toJSONString();
indexRequest.source(json, XContentType.JSON);
bulkProcessor.add(indexRequest);
}
System.out.println("Submitted all docs
}
private BulkProcessor.Listener bulkListenerToSink(FluxSink<String> sink) {
return new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}
@SuppressWarnings("unchecked")
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
for (BulkItemResponse bulkItemResponse : response) {
JSONObject json = new JSONObject();
json.put("id", bulkItemResponse.getResponse().getId());
json.put("status", bulkItemResponse.getResponse().getResult
sink.next(json.toJSONString());
processed++;
}
if(processed >= 5000) {
sink.complete();
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
failure.printStackTrace();
sink.error(failure);
}
};
}
public ElasticAdapter() {
// Logic to initialize Elasticsearch Rest Client
}
}
Я использовал FluxSink для создания потока ответов для отправки обратно Клиенту. На данный момент я понятия не имею, правильное ли это или нет.
Я ожидаю, что вызывающий клиент должен получить ответы партиями по 10 (потому что массовый процессор обрабатывает их партиями по 10 - builder.setBulkActions(10);
) , Я пытался использовать конечную точку, используя Spring Webflix Client. Но не в состоянии решить это. Это то, что я пробовал
WebClient
public class FluxClient {
public static void main(String[] args) {
WebClient client = WebClient.create("http://localhost:8080");
Flux<String> responseFlux = client.get()
.uri("/test3")
.retrieve()
.bodyToFlux(String.class);
responseFlux.subscribe(System.out::println);
}
}
На консоли ничего не печатается, как я ожидал. Я пытался использовать System.out.println(responseFlux.blockFirst());
. Он печатает все ответы как один пакет в конце, а не в пакетах в.
Если мой подход правильный, как правильно его потреблять? Для решения, на мой взгляд, этот клиент будет находиться в другом Webapp.
Примечания. Мое понимание API Reactor ограничено. Используемая версия упругого поиска: 6.8.