SparkStream с Elasticsearch Sink Upsert (update_by_query) - PullRequest
0 голосов
/ 27 апреля 2020

Можно ли использовать API delete_by_query в интеграции Spark SpleaseSearch Streaming?

Мой документ ES выглядит следующим образом:

{"tag": <String>, "value": <Integer>}

Мой пример использования заключается в том, что мне нужно увеличить значение на 1 для всех вхождений определенного тега.

В основном я хочу сделать что-то вроде:

POST <my_index>/_update_by_query
{
  "script": {
    "source": "ctx._source.value++",
    "lang": "painless"
  },
  "query": {
    "term": {
      "tag": <TAG>
    }
  }
} 

Попробовал это в искре (код ниже):

data.writeStream
      .outputMode("append")
      .format("org.elasticsearch.spark.sql")
      .option("es.nodes", "localhost")
      .option("es.port", "9200")
      .option("es.resource","test/_doc")
      .option("checkpointLocation", "/var/log/es_cp")
      .option("es.mapping.id", "_id")
      .option("es.update.script.inline", "ctx._source.value++")
      .option("es.write.operation", "upsert")
      .start()
      .awaitTermination()

Но для этого требуется идентификатор. Я используюasticsearch-had oop 6.8.6 для записи в ES-приемник.

...