Можно ли использовать API delete_by_query в интеграции Spark SpleaseSearch Streaming?
Мой документ ES выглядит следующим образом:
{"tag": <String>, "value": <Integer>}
Мой пример использования заключается в том, что мне нужно увеличить значение на 1 для всех вхождений определенного тега.
В основном я хочу сделать что-то вроде:
POST <my_index>/_update_by_query
{
"script": {
"source": "ctx._source.value++",
"lang": "painless"
},
"query": {
"term": {
"tag": <TAG>
}
}
}
Попробовал это в искре (код ниже):
data.writeStream
.outputMode("append")
.format("org.elasticsearch.spark.sql")
.option("es.nodes", "localhost")
.option("es.port", "9200")
.option("es.resource","test/_doc")
.option("checkpointLocation", "/var/log/es_cp")
.option("es.mapping.id", "_id")
.option("es.update.script.inline", "ctx._source.value++")
.option("es.write.operation", "upsert")
.start()
.awaitTermination()
Но для этого требуется идентификатор. Я используюasticsearch-had oop 6.8.6 для записи в ES-приемник.