Массовое обновление в Spark - Elasticsearch - PullRequest
0 голосов
/ 04 февраля 2019

Я получаю события из темы Кафки и предварительно агрегирую их перед сохранением в Elastic Search.Все, что мне нужно, это способ обновить счетчик моих документов в скрипте upsert и сохранить его в Elastic Search

В настоящее время, чтобы все заработало, я делаю это:

if (!results.collect().isEmpty()) { // save to es
 AggEventEsDocument documentToUpdate = results.collect().get(0);
 String updateScript = "ctx._source.count += " + documentToUpdate.getCount();

 esSettings.put(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, updateScript);
 JavaEsSpark.saveToEs(results, jobConfig.getEsIndexAndType(), esSettings);
    }
((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);

Здесь,Я получаю счет только для первого документа, и я хочу сделать следующее:

(я знаю, что это не сработает, я уже получаю «Задача не сериализуемая опция»)

results.foreach(event -> {
 String updateScript = "ctx._source.count += " + event.getCount();     
 esSettings.put(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, updateScript);
 JavaEsSpark.saveToEs(event, jobConfig.getEsIndexAndType(), esSettings);
})

ИЛИ может быть что-то вроде этого:

BulkRequest bulkRequest = new BulkRequest();
results.foreach(event -> {
 UpdateRequest request = new UpdateRequest(jobConfig.getEsIndex(), jobConfig.getEsType(), aggregatedEvent.getPairId());
            Map<String, Object> parameters = Collections.singletonMap("count", aggregatedEvent.getCount());

 Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.count += params.count", parameters);
            request.script(inline);

 String jsonString = aggregatedEvent.toString();
            request.upsert(jsonString, XContentType.JSON);

 bulkRequest.add(request);
});

client.bulk(bulkRequest, RequestOptions.DEFAULT);
...