Как обновить счетчик в ElasticSearch от структурированной потоковой передачи Spark? - PullRequest
0 голосов
/ 29 августа 2018

Я работаю над проектом Spark Structured Streaming, цель - сохранить журналы активности пользователей в ElasticSearch.

Проблема :

  1. Когда user_id отображается впервые за последние 8 часов, создайте новую запись в ElasticSearch и установите для counter в документе значение 1;
  2. Если за последние 8 часов было зарегистрировано больше действий (журналов) одного и того же пользователя, обновите поле counter, добавьте количество действий к его значению и, наконец, обновите поле update_time.

Установка "es.mapping.id" -> "user_id" и "es.write.operation" -> "upsert" - это насколько я могу, но я не могу обновить счетчик и время при обновлении. Может быть, es.update.script.inline поможет?

1 Ответ

0 голосов
/ 29 августа 2018

Через некоторое время после прочтения документа ES Scripted Updates , вот простое решение для обновления counter с использованием безболезненных встроенных сценариев.

Итак, ключ в том, чтобы использовать безболезненный скрипт ctx._source.counter += params.counter, который counter представляет мой столбец DataFrame 'counter, который должен быть агрегирован раньше.

В конце концов, я в итоге так:

val esOptions = Map(
   "es.write.operation"      -> "upsert"
  ,"es.mapping.id"           -> "user_id"
  ,"es.update.script.lang"   -> "painless"
  ,"es.update.script.inline" -> "ctx._source.counter += params.counter"
  ,"es.update.script.params" -> "counter:counter"

df.writeStream.options(esOptions)
  .format("org.elasticsearch.spark.sql")
  .start("user_activity/log")

Опять же, это решает только обновление счетчика. Позже добавлю способ обновить update_time поле, когда я его прибью.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...