Фильтр строк после агрегации Elasticsearch - PullRequest
1 голос
/ 03 октября 2019

У меня есть система с устройствами, которые обмениваются данными через некоторые шлюзы, а затем в бэкэнде метрики сохраняются вasticsearch.

Я хочу знать датчики, которые сейчас обмениваются данными через определенный gateway_id.

У меня есть такое сопоставление:

{
  "mappings": {
    "properties": {
      "context": {
        "properties": {
          "gateway": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "id": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          }
}},
      "timeserver": {
        "type": "date"
      },
      "timestamp": {
        "type": "date"
      },
      "type": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "value": {
        "type": "double"
      }
    }
  }
}

В поле шлюза сохраняется в виде строки идентификатор шлюза, используемый для каждой метрики.

Я могуполучить последнее сообщение для каждого устройства с помощью этого запроса:

GET _search
{
  "size": 0,
  "aggs": {
    "id_agg": {
      "terms": {
        "field": "context.id.keyword"
        , "size": 10000
      },
      "aggs": {
        "group_docs": {
          "top_hits": {
            "size": 1,
            "sort": [
              {
                "timestamp": {
                  "order": "desc"
                }
              }
            ]
          }
        }
      }
    }
  },
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "_index": "measurements.group.*"
          }
        }
      ]
    }
  }
}

Но как я могу отфильтровать этот результат агрегации, чтобы получить только датчики, которые в данный момент используют определенный шлюз? Добавляя что-то вроде: "filter": {"term":{"context.gateway": {"value": "request_gateway_serial" }} },

Я искал агрегацию bucket_selector и конвейерные агрегации, но не нашел пути, и мне кажется, что они работают только с числовыми значениями, без строк, как мой шлюзfield.

Пример запроса возвращает: (Список самых последних сообщений для каждого устройства)

"aggregations" : {
          {
            "key" : "1234",

                "context" : {
                  "gateway" : "123456",
                  "id" : "1234", 
          },{
            "key" : "12345",
                      "context" : {
                        "gateway" : "1234567",
                        "id" : "12345",
          }, {
             "key" : "12345678",
                     "context" : {
                        "gateway" : "1234567",
                        "id" : "12345678",
}} 

Мой ожидаемый результат - это фильтр для «шлюза»: «1234567» и получениетолько «ключ»: «12345» и «ключ»: «12345678»

1 Ответ

0 голосов
/ 03 октября 2019

Вы можете использовать фильтр агрегации

GET sensors/_search
{
  "size": 0,
  "aggs": {
    "filter_gateway": {
      "filter": {
        "term": {
          "context.gateway.keyword": "request_gateway_serial"
        }
      },
      "aggs": {
        "id_agg": {
          "terms": {
            "field": "context.id.keyword",
            "size": 10000
          },
          "aggs": {
            "group_docs": {
              "top_hits": {
                "size": 1,
                "sort": [
                  {
                    "timestamp": {
                      "order": "desc"
                    }
                  }
                ]
              }
            }
          }
        }
      }
    }
  },
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "_index": "measurements.group.*"
          }
        }
      ]
    }
  }
}

В зависимости от ваших требований вы также можете фильтровать документы в части запроса и затем выполнять агрегирование по ней.

РЕДАКТИРОВАТЬ 1:

В приведенном ниже запросе я получаю максимальную временную метку под идентификатором устройства и максимальную временную метку, отфильтрованную на данном шлюзе. Если обе даты совпадают, будет указан идентификатор устройства, который последний раз общался со шлюзом.

ex.

Запрос:

GET sensors/_search
{
  "size": 0,
  "aggs": {
    "id_agg": {
      "terms": {
        "field": "context.id.keyword",
        "size": 10000
      },
      "aggs": {
        "maxDate": {
          "max": {
            "field": "context.timestamp"
          }
        },
        "Filter": {
          "filter": {
            "term": {
              "context.gateway": "1234568"
            }
          },
          "aggs": {
            "filtered_maxdate": {
              "max": {
                "field": "context.timestamp"
              }
            }
          }
        },
        "last_geteway_filter": {
          "bucket_selector": {
            "buckets_path": {
              "filtereddate": "Filter>filtered_maxdate",
              "maxDate": "maxDate"
            },
            "script": "params.filtereddate==params.maxDate"
          }
        }
      }
    }
  }
}

Данные:

 [
      {
        "_index" : "sensors",
        "_type" : "_doc",
        "_id" : "eiZ1pW0BcOVYVz455V6s",
        "_score" : 1.0,
        "_source" : {
          "context.gateway" : "1234567",
          "context.id" : 1234,
          "context.timestamp" : "2019-10-02"
        }
      },
      {
        "_index" : "sensors",
        "_type" : "_doc",
        "_id" : "eyZ2pW0BcOVYVz45B14T",
        "_score" : 1.0,
        "_source" : {
          "context.gateway" : "1234568",
          "context.id" : 1234,
          "context.timestamp" : "2019-10-03"
        }
      },
      {
        "_index" : "sensors",
        "_type" : "_doc",
        "_id" : "fCZ2pW0BcOVYVz45Jl6m",
        "_score" : 1.0,
        "_source" : {
          "context.gateway" : "1234569",
          "context.id" : 1234,
          "context.timestamp" : "2019-10-04"
        }
      },
      {
        "_index" : "sensors",
        "_type" : "_doc",
        "_id" : "fSZ2pW0BcOVYVz45dV48",
        "_score" : 1.0,
        "_source" : {
          "context.gateway" : "1234567",
          "context.id" : 1235,
          "context.timestamp" : "2019-10-02"
        }
      },
      {
        "_index" : "sensors",
        "_type" : "_doc",
        "_id" : "fiZ2pW0BcOVYVz45l17A",
        "_score" : 1.0,
        "_source" : {
          "context.gateway" : "1234568",
          "context.id" : 1235,
          "context.timestamp" : "2019-10-03"
        }
      }
    ]
  }

Результат:

Device 12345 had last document under gateway 1234568

"buckets" : [
        {
          "key" : "1235",
          "doc_count" : 2,
          "Filter" : {
            "doc_count" : 1,
            "filtered_maxdate" : {
              "value" : 1.5700608E12,
              "value_as_string" : "2019-10-03T00:00:00.000Z"
            }
          },
          "maxDate" : {
            "value" : 1.5700608E12,
            "value_as_string" : "2019-10-03T00:00:00.000Z"
          }
        }
      ]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...