Я нашел похожий вопрос - здесь, в StackOverflow - к сожалению, я забыл сохранить ссылку, поэтому я просто интерпретирую здесь в этом ответе.
Я использовал AtomicReference в качестве типа bean-объекта повторно инициализированный объект BulkRequest вметод смыва.Затем я добавил синхронизацию к вызовам BulkRequest.add
, так как он использует список в фоновом режиме.
Обратите внимание, что это решение несколько грязное и было описано так же в ссылочном ответе - но оно РАБОТАЕТ для меня ..
Код:
Боб
@Bean
public AtomicReference<BulkRequest> bulkRequest() {
return new AtomicReference<BulkRequest>(new BulkRequest());
}
Контроллер:
@Autowired
private AtomicReference<BulkRequest> bulkRequest;
@PostMapping
public void index(@RequestBody String o) {
synchronized (bulkRequest.get()) {
bulkRequest.get().add(new IndexRequest(config.INDEX, config.TYPE).source(o, XContentType.JSON));
}
}
@DeleteMapping(path="/{id}")
public void delete(@PathVariable String id) {
synchronized (bulkRequest.get()) {
bulkRequest.get().add(new DeleteRequest(config.INDEX, config.TYPE, id));
}
}
@PutMapping(path="/{id}")
public void update(@PathVariable String id, @RequestBody String o) {
synchronized (bulkRequest.get()) {
bulkRequest.get().add(new UpdateRequest(config.INDEX, config.TYPE, id).doc(o, XContentType.JSON));
}
}
@PostMapping(path = "/flush")
public String flush() throws Exception {
synchronized (bulkRequest.get()) {
String result = bulkService.flush(bulkRequest);
bulkRequest.set(new BulkRequest());
return result;
}
}
BulkService
@Service
public class BulkService {
@Autowired
private RestHighLevelClient client;
public String flush( AtomicReference<BulkRequest> bulkRequest) throws Exception {
BulkResponse bulkResponse = client.bulk(bulkRequest.get(), RequestOptions.DEFAULT);
if(bulkResponse.hasFailures()) {
return bulkResponse.buildFailureMessage();
}
else {
return "All operations in the bulk request proceeded successfully!";
}
}
}