Несмотря на то, что я хотел бы избежать сценариев, Агрегирование метрик по сценариям кажется единственным способом выполнить то, что было запрошено:
{
"size": 0,
"aggs": {
"visitors": {
"scripted_metric": {
"init_script": "params._agg.dateMap = new HashMap();",
"map_script": "params._agg.dateMap.merge(doc.userId[0].toString(), doc.eventDate.value, (e1, e2) -> e1.isBefore(e2) ? e1 : e2);",
"combine_script": "return params._agg.dateMap;",
"reduce_script": "def dateMap = new HashMap(); for (map in params._aggs) { if (map == null) continue; for (entry in map.entrySet()) dateMap.merge(entry.key, entry.value, (e1, e2) -> e1.isBefore(e2) ? e1 : e2); } def hist = new TreeMap(); for (entry in dateMap.entrySet()) hist.merge(entry.value.toString(), 1, (a, b) -> a + 1); return hist;"
}
}
}
}
Init просто создает пустой HashMap, Mapзаполняет эту карту с помощью userId в качестве ключа и устанавливает в качестве значения самую старую eventDate, а Combine просто разворачивает карту, которая должна быть передана в Reduce:
def dateMap = new HashMap();
for (map in params._aggs) {
if (map == null) continue;
for (entry in map.entrySet())
dateMap.merge(entry.key, entry.value, (e1, e2) -> e1.isBefore(e2) ? e1 : e2);
}
def hist = new TreeMap();
for (entry in dateMap.entrySet())
hist.merge(entry.value.toString(), 1, (a, b) -> a + 1);
return hist;
Up для объединения код был выполнен для каждого узла кластера,Reduce объединяет все карты в одну (т.е. dateMap), сохраняя самую старую eventDate для userId.Затем он подсчитывает вхождения каждой eventDate.
Результат:
"aggregations": {
"visitors": {
"value": {
"2019-03-04T13:40:18.514Z": 1,
"2019-03-04T13:46:18.514Z": 1,
"2019-03-04T13:50:18.514Z": 1,
"2019-03-05T13:46:18.514Z": 1
}
}
}
Единственная недостающая часть состоит в том, что эти значения должны быть сгруппированы в гистограмму в коде приложения.
Примечание¹: Используйте на свой страх и риск , я не знаю, сильно ли увеличивается потребление памяти из-за этих хэш-карт или насколько хорошо она работает на больших наборах данных.
Примечание²: запускиз Elasticsearch 6.4 state
и states
следует использовать вместо params._agg
и params._aggs
.