эластичный запрос для получения событий, в которых отсутствует соответствующая пара - PullRequest
0 голосов
/ 15 октября 2019

У меня есть записи транзакции, которые следуют за следующим жизненным циклом.

  • Событие, когда транзакция получена [RCVD]
  • Событие, когда транзакция ожидает выполнения [PNDG] (ДОПОЛНИТЕЛЬНЫЙ шаг)
  • Событие при его выполнении [SENT]

Ниже приведены 7 примеров событий в индексе:

{trxID: 1, status:RCVD}
{trxID: 2, status:RCVD}
{trxID: 3, status:RCVD}
{trxID: 2, status:PNDG}
{trxID: 3, status:PNDG}
{trxID: 1, status:SENT}
{trxID: 2, status:SENT}

Мне нужно найти все транзакции, которыеперешел в состояние ожидания, но еще не выполнено. Другими словами, для транзакции должен быть статус PNDG , но не SENT . Я пытаюсь не делать это на уровне Java.

Я сделал агрегацию по trxID, а затем я сделал субагрегацию по статусу. Тогда я не могу понять, как получить те записи, где ведро имеет только PNDG в субагрегации. Я не уверен, что я думаю в правильном направлении.

Ожидаемый результат: trxID 3 , потому что для этой транзакции мы получили статус PNDG , но неполучить отправлено еще. С другой стороны, TrxUD 1 не следует сообщать, поскольку он никогда не переходил в состояние PNDG (в ожидании), независимо от того, сообщается ли о состоянии SENT о том, что нет.

Ответы [ 2 ]

1 голос
/ 15 октября 2019

Вы можете использовать количество статусов под идентификатором транзакции.

GET index24/_search
{
  "size": 0,
  "aggs": {
    "transactionId": {
      "terms": {
        "field": "trxID",
        "size": 10
      },
      "aggs": {
        "status": {
          "terms": {
            "field": "status.keyword",
            "size": 10
          }
        },
        "count": {
          "cardinality": {
            "field": "status.keyword"
          }
        },
        "my_bucketselector": {
          "bucket_selector": {
            "buckets_path": {
              "statusCount": "count"
            },
            "script": "params.statusCount==1"
          }
        }
      }
    }
  }
}

Ответ:

"aggregations" : {
    "transactionId" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : 4,
          "doc_count" : 1,
          "count" : {
            "value" : 1
          },
          "status" : {
            "doc_count_error_upper_bound" : 0,
            "sum_other_doc_count" : 0,
            "buckets" : [
              {
                "key" : "PNDG",
                "doc_count" : 1
              }
            ]
          }
        }
      ]
    }
  }

РЕДАКТИРОВАТЬ 1: я пытался с ниже: - Получить максимальную дату для транзакцииидентификатор, а затем получить дату в ожидании. Если обе даты совпадают, то ожидающий является последним статусом

Данные:

 [
      {
        "_index" : "index24",
        "_type" : "_doc",
        "_id" : "aYCs0m0BD5PlkoxXxO36",
        "_score" : 1.0,
        "_source" : {
          "trxID" : 1,
          "status" : "RCVD",
          "date" : "2019-10-15T12:00:00"
        }
      },
      {
        "_index" : "index24",
        "_type" : "_doc",
        "_id" : "aoCs0m0BD5PlkoxX7e35",
        "_score" : 1.0,
        "_source" : {
          "trxID" : 1,
          "status" : "PNDG",
          "date" : "2019-10-15T12:01:00"
        }
      },
      {
        "_index" : "index24",
        "_type" : "_doc",
        "_id" : "a4Ct0m0BD5PlkoxXCO06",
        "_score" : 1.0,
        "_source" : {
          "trxID" : 1,
          "status" : "SENT",
          "date" : "2019-10-15T12:02:00"
        }
      },
      {
        "_index" : "index24",
        "_type" : "_doc",
        "_id" : "bICt0m0BD5PlkoxXQe0Y",
        "_score" : 1.0,
        "_source" : {
          "trxID" : 2,
          "status" : "RCVD",
          "date" : "2019-10-15T12:00:00"
        }
      },
      {
        "_index" : "index24",
        "_type" : "_doc",
        "_id" : "bYCt0m0BD5PlkoxXZO2x",
        "_score" : 1.0,
        "_source" : {
          "trxID" : 2,
          "status" : "PNDG",
          "date" : "2019-10-15T12:01:00"
        }
      },
      {
        "_index" : "index24",
        "_type" : "_doc",
        "_id" : "boCt0m0BD5PlkoxXju1H",
        "_score" : 1.0,
        "_source" : {
          "trxID" : 3,
          "status" : "RCVD",
          "date" : "2019-10-15T12:00:00"
        }
      },
      {
        "_index" : "index24",
        "_type" : "_doc",
        "_id" : "b4Ct0m0BD5PlkoxXou0-",
        "_score" : 1.0,
        "_source" : {
          "trxID" : 3,
          "status" : "SENT",
          "date" : "2019-10-15T12:01:00"
        }
      }
    ]

Запрос:

GET index24/_search
{
  "size": 0,
  "aggs": {
    "transactionId": {
      "terms": {
        "field": "trxID",
        "size": 10000
      },
      "aggs": {
        "maxDate": {
          "max": {
            "field": "date"  ---> get max date under transactions
          }
        },
        "pending_status": {
          "filter": {
            "term": {
              "status.keyword": "PNDG" ---> filter for pending
            }
          },
          "aggs": {
            "filtered_maxdate": {
              "max": {
                "field": "date"    --> get date under pending
              }
            }
          }
        },
        "buckets_latest_status_pending": { -->filter if max date==pending date
          "bucket_selector": {
            "buckets_path": {
              "filtereddate": "pending_status>filtered_maxdate",
              "maxDate": "maxDate"
            },
            "script": "params.filtereddate==params.maxDate"
          }
        }
      }
    }
  }
}

Ответ:

{
    "transactionId" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : 2,  --> only transaction id 2 is returned
          "doc_count" : 2,
          "pending_status" : {
            "doc_count" : 1,
            "filtered_maxdate" : {
              "value" : 1.57114086E12,
              "value_as_string" : "2019-10-15T12:01:00.000Z"
            }
          },
          "maxDate" : {
            "value" : 1.57114086E12,
            "value_as_string" : "2019-10-15T12:01:00.000Z"
          }
        }
      ]
    }
  }
0 голосов
/ 15 октября 2019

Я выполнил агрегацию по trxID, а затем выполнил субагрегацию по статусу.

Это отличное начало !!!

Теперь вы можете использоватьbucket_selector конвейерная агрегация , чтобы покрыть только транзакции, которые имеют только 1 или 2 документа, т.е. условие сценария params.eventCount < 3 гарантирует, что перехватит все сегменты, которые имеют RCVD и / или PNDGдокументы, но не SENT документы:

POST events/_search
{
  "size": 0,
  "aggs": {
    "trx": {
      "terms": {
        "field": "trxID",
        "size": 1000
      },
      "aggs": {
        "count": {
          "cardinality": {
            "field": "status.keyword"
          }
        },
        "not_sent": {
          "bucket_selector": {
            "buckets_path": {
              "eventCount": "count"
            },
            "script": "params.eventCount < 3"
          }
        }
      }
    }
  }
}

В вашем случае это приведет к этому, то есть только к событию с trxID = 3:

  "aggregations" : {
    "trx" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : 3,
          "doc_count" : 2,
          "count" : {
            "value" : 2
          }
        }
      ]
    }
  }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...