Через некоторое время после прочтения документа ES Scripted Updates , вот простое решение для обновления counter
с использованием безболезненных встроенных сценариев.
Итак, ключ в том, чтобы использовать безболезненный скрипт ctx._source.counter += params.counter
, который counter
представляет мой столбец DataFrame 'counter
, который должен быть агрегирован раньше.
В конце концов, я в итоге так:
val esOptions = Map(
"es.write.operation" -> "upsert"
,"es.mapping.id" -> "user_id"
,"es.update.script.lang" -> "painless"
,"es.update.script.inline" -> "ctx._source.counter += params.counter"
,"es.update.script.params" -> "counter:counter"
df.writeStream.options(esOptions)
.format("org.elasticsearch.spark.sql")
.start("user_activity/log")
Опять же, это решает только обновление счетчика. Позже добавлю способ обновить update_time
поле, когда я его прибью.