Я получаю события из темы Кафки и предварительно агрегирую их перед сохранением в Elastic Search.Все, что мне нужно, это способ обновить счетчик моих документов в скрипте upsert и сохранить его в Elastic Search
В настоящее время, чтобы все заработало, я делаю это:
if (!results.collect().isEmpty()) { // save to es
AggEventEsDocument documentToUpdate = results.collect().get(0);
String updateScript = "ctx._source.count += " + documentToUpdate.getCount();
esSettings.put(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, updateScript);
JavaEsSpark.saveToEs(results, jobConfig.getEsIndexAndType(), esSettings);
}
((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
Здесь,Я получаю счет только для первого документа, и я хочу сделать следующее:
(я знаю, что это не сработает, я уже получаю «Задача не сериализуемая опция»)
results.foreach(event -> {
String updateScript = "ctx._source.count += " + event.getCount();
esSettings.put(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, updateScript);
JavaEsSpark.saveToEs(event, jobConfig.getEsIndexAndType(), esSettings);
})
ИЛИ может быть что-то вроде этого:
BulkRequest bulkRequest = new BulkRequest();
results.foreach(event -> {
UpdateRequest request = new UpdateRequest(jobConfig.getEsIndex(), jobConfig.getEsType(), aggregatedEvent.getPairId());
Map<String, Object> parameters = Collections.singletonMap("count", aggregatedEvent.getCount());
Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.count += params.count", parameters);
request.script(inline);
String jsonString = aggregatedEvent.toString();
request.upsert(jsonString, XContentType.JSON);
bulkRequest.add(request);
});
client.bulk(bulkRequest, RequestOptions.DEFAULT);