SpringBoot - эластичный массовый запрос - синхронизация контроллера добавления и сброса - PullRequest
0 голосов
/ 20 февраля 2019


Я использую высокоуровневый клиент покоя для подключения кasticsearch 6.5 из приложения весенней загрузки.

Я хочу создать контроллер с методами добавления команд в массовый запрос и методом сброса (фактическивыполнить) операцию массового запроса.

Я кодировал ее так:
BulkRequest bean - обратите внимание на одноэлементную область действия

@Bean
public BulkRequest bulkRequest() {
  return new BulkRequest();
}

Bulk Controller

@RestController
@RequestMapping("/bulk")
public class BulkController {

    @Autowired
    private BulkRequest bulkRequest;

    @Autowired
    RestHighLevelClient client;

    @PostMapping
    public void index(@RequestBody String o) {
        bulkRequest.add(new IndexRequest(config.INDEX, config.TYPE).source(o, XContentType.JSON));
    }

    @PostMapping(path = "/flush")
    public String flush() throws Exception {
        BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);

        if(bulkResponse.hasFailures()) {
          return bulkResponse.buildFailureMessage();
        }
        else {
          return "All operations in the bulk request proceeded successfully!";
        }
    }


Теперь вопросы:
- это метод bulkRequest.add, синхронизированный в области действия компонента BulkRequest (в данном случае singleton)?

- как запустить новый BulkRequest после вызова метода BulkController.flush?Нужно ли создавать экземпляр нового bean-компонента BulkRequest и каким-то образом передавать его в среду bean-компонента?

- что нужно изменить в случае bean-компонента BulkRequest с областью действия @session?

1 Ответ

0 голосов
/ 20 февраля 2019

Я нашел похожий вопрос - здесь, в StackOverflow - к сожалению, я забыл сохранить ссылку, поэтому я просто интерпретирую здесь в этом ответе.

Я использовал AtomicReference в качестве типа bean-объекта повторно инициализированный объект BulkRequest вметод смыва.Затем я добавил синхронизацию к вызовам BulkRequest.add, так как он использует список в фоновом режиме.

Обратите внимание, что это решение несколько грязное и было описано так же в ссылочном ответе - но оно РАБОТАЕТ для меня ..

Код:
Боб

    @Bean
    public AtomicReference<BulkRequest> bulkRequest() {
      return new AtomicReference<BulkRequest>(new BulkRequest());
    }

Контроллер:

        @Autowired
        private AtomicReference<BulkRequest> bulkRequest;

        @PostMapping
        public void index(@RequestBody String o) {
            synchronized (bulkRequest.get()) {
                bulkRequest.get().add(new IndexRequest(config.INDEX, config.TYPE).source(o, XContentType.JSON));
            }
        }

        @DeleteMapping(path="/{id}")
        public void delete(@PathVariable String id) {
            synchronized (bulkRequest.get()) {
                bulkRequest.get().add(new DeleteRequest(config.INDEX, config.TYPE, id));
            }
        }

        @PutMapping(path="/{id}")
        public void update(@PathVariable String id, @RequestBody String o) {
            synchronized (bulkRequest.get()) {
                bulkRequest.get().add(new UpdateRequest(config.INDEX, config.TYPE, id).doc(o, XContentType.JSON));
            }
        }

 @PostMapping(path = "/flush")
    public String flush() throws Exception {
        synchronized (bulkRequest.get()) {
            String result = bulkService.flush(bulkRequest);

            bulkRequest.set(new BulkRequest());

            return result;
        }
    }

BulkService

@Service
public class BulkService {

    @Autowired
    private RestHighLevelClient client;

    public String flush( AtomicReference<BulkRequest> bulkRequest) throws Exception {

        BulkResponse bulkResponse = client.bulk(bulkRequest.get(), RequestOptions.DEFAULT);

        if(bulkResponse.hasFailures()) {
            return bulkResponse.buildFailureMessage();
        }
        else {
            return "All operations in the bulk request proceeded successfully!";
        }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...