Reactive Spring Boot API-оболочка Elynsearch asyn c массовая индексация - PullRequest
1 голос
/ 02 апреля 2020

Я разрабатываю прототип для нового проекта. Идея заключается в предоставлении микросервиса Reactive Spring Boot для массового индексирования документов в Elasticsearch. Elasticsearch предоставляет High Level Rest Client, который предоставляет метод Asyn c для массовой обработки запросов на индексирование. Asyn c доставляет обратные вызовы с использованием слушателей, упомянутых здесь . Обратные вызовы получают индексные ответы (по запросам) в пакетном режиме. Я пытаюсь отправить этот ответ клиенту как Flux. Я придумал что-то основанное на этом посте в блоге .

Контроллер

@RestController
public class AppController {

    @SuppressWarnings("unchecked")
    @RequestMapping(value = "/test3", method = RequestMethod.GET)
    public Flux<String> index3() {
        ElasticAdapter es = new ElasticAdapter();
        JSONObject json = new JSONObject();
        json.put("TestDoc", "Stack123");
        Flux<String>  fluxResponse = es.bulkIndex(json);
        return fluxResponse;
    }

ElasticAdapter

@Component
class ElasticAdapter {
String indexName = "test2"; 
    private final RestHighLevelClient client;
    private final ObjectMapper mapper;
    private int processed = 1;

    Flux<String> bulkIndex(JSONObject doc) {
        return bulkIndexDoc(doc)
                .doOnError(e -> System.out.print("Unable to index {}" + doc+ e));
    }

    private Flux<String> bulkIndexDoc(JSONObject doc) {
        return Flux.create(sink -> {
            try {
                doBulkIndex(doc, bulkListenerToSink(sink));
            } catch (JsonProcessingException e) {
                sink.error(e);
            }
        });
    }


    private void doBulkIndex(JSONObject doc, BulkProcessor.Listener listener) throws JsonProcessingException {

        System.out.println("Going to submit index request");
        BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
                (request, bulkListener) ->
                    client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
                    BulkProcessor.Builder builder =
                            BulkProcessor.builder(bulkConsumer, listener);
        builder.setBulkActions(10); 
        BulkProcessor bulkProcessor = builder.build();
        // Submitting 5,000 index requests ( repeating same JSON)
        for (int i = 0; i < 5000; i++) {
            IndexRequest indexRequest = new IndexRequest(indexName, "person", i+1+"");
             String json = doc.toJSONString();
            indexRequest.source(json, XContentType.JSON);
            bulkProcessor.add(indexRequest);
        }
        System.out.println("Submitted all docs
    }


    private BulkProcessor.Listener bulkListenerToSink(FluxSink<String> sink) {
        return new BulkProcessor.Listener() {

            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
            }

            @SuppressWarnings("unchecked")
            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {

                for (BulkItemResponse bulkItemResponse : response) {
                    JSONObject json = new JSONObject();
                    json.put("id", bulkItemResponse.getResponse().getId());
                    json.put("status", bulkItemResponse.getResponse().getResult

                    sink.next(json.toJSONString()); 
                    processed++;
                }
                if(processed >= 5000) {
                    sink.complete();
                }
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                failure.printStackTrace();
                sink.error(failure);
            }
        };
    }


    public ElasticAdapter() {
    // Logic to initialize  Elasticsearch Rest Client 
    }
}

Я использовал FluxSink для создания потока ответов для отправки обратно Клиенту. На данный момент я понятия не имею, правильное ли это или нет.

Я ожидаю, что вызывающий клиент должен получить ответы партиями по 10 (потому что массовый процессор обрабатывает их партиями по 10 - builder.setBulkActions(10);) , Я пытался использовать конечную точку, используя Spring Webflix Client. Но не в состоянии решить это. Это то, что я пробовал

WebClient

public class FluxClient {

    public static void main(String[] args) {
        WebClient client = WebClient.create("http://localhost:8080");
        Flux<String> responseFlux = client.get()
                  .uri("/test3")
                  .retrieve()
                  .bodyToFlux(String.class);
        responseFlux.subscribe(System.out::println);
    }
}

На консоли ничего не печатается, как я ожидал. Я пытался использовать System.out.println(responseFlux.blockFirst());. Он печатает все ответы как один пакет в конце, а не в пакетах в.

Если мой подход правильный, как правильно его потреблять? Для решения, на мой взгляд, этот клиент будет находиться в другом Webapp.

Примечания. Мое понимание API Reactor ограничено. Используемая версия упругого поиска: 6.8.

1 Ответ

2 голосов
/ 03 апреля 2020

Итак, внесены следующие изменения в ваш код.

В ElasticAdapter,

public Flux<Object> bulkIndex(JSONObject doc) {
    return bulkIndexDoc(doc)
            .subscribeOn(Schedulers.elastic(), true)
            .doOnError(e -> System.out.print("Unable to index {}" + doc+ e));
}

Вызвана подписка (Scheduler, requestOnSeparateThread) на Flux, узнал об этом от, https://github.com/spring-projects/spring-framework/issues/21507

В FluxClient,

Flux<String> responseFlux = client.get()
              .uri("/test3")
              .headers(httpHeaders -> {
                  httpHeaders.set("Accept", "text/event-stream");
              })
              .retrieve()
              .bodyToFlux(String.class);
responseFlux.delayElements(Duration.ofSeconds(1)).subscribe(System.out::println);

Добавлен заголовок «Принять» как «поток текста / событий» и задержанные элементы Flux.

С Вышеуказанные изменения смогли получить ответ в режиме реального времени с сервера.

...