Агрегирование по нескольким полям и сопоставление с одним результатом - PullRequest
0 голосов
/ 15 апреля 2019

Для данных, которые передаются из нашей системы заявок, мы пытаемся добиться следующего:

Получите количество открытых заявок, сгруппированных по статусу и клиенту.Упрощенная схема выглядит следующим образом:


 Field               | Type                      
-------------------------------------------------
 ROWTIME             | BIGINT           (system) 
 ROWKEY              | VARCHAR(STRING)  (system) 
 ID                  | BIGINT                    
 TICKET_ID           | BIGINT                    
 STATUS              | VARCHAR(STRING)           
 TICKETCATEGORY_ID   | BIGINT                    
 SUBJECT             | VARCHAR(STRING)           
 PRIORITY            | VARCHAR(STRING)           
 STARTTIME           | BIGINT                    
 ENDTIME             | BIGINT                    
 CHANGETIME          | BIGINT                    
 REMINDTIME          | BIGINT                    
 DEADLINE            | INTEGER                   
 CONTACT_ID          | BIGINT           

. Мы хотим использовать эти данные, чтобы получить количество заявок с определенным статусом (открыт, ожидание, выполняется и т. Д.) Для каждого клиента.Эти данные относятся к одному сообщению в другой теме. Схема может выглядеть следующим образом:

 Field               | Type                      
-------------------------------------------------
 ROWTIME             | BIGINT           (system) 
 ROWKEY              | VARCHAR(STRING)  (system) 
 CONTACT_ID          | BIGINT                    
 COUNT_OPEN          | BIGINT                    
 COUNT_WAITING       | BIGINT                    
 COUNT_CLOSED        | BIGINT                    

Мы планируем использовать эти и другие данные для обогащения информации о клиентах и ​​публикации расширенного набора данных во внешней системе (например,asticsearch)

Получить первую часть довольно просто - сгруппировать заявки по клиенту и статусу.

select contact_id,status count(*) cnt from tickets group by contact_id,status;

Но теперь мы застряли - мы получаем несколько строк / сообщений для каждого клиента,и мы просто не знаем, как преобразовать их в одно сообщение с ключом contact_id.

Мы пробовали соединения, но все наши попытки ни к чему не привели.

Пример

Создать таблицу для всех заявок со статусом «ожидание», сгруппированных по клиентам

create table waiting_tickets_by_cust with (partitions=12,value_format='AVRO')
as select contact_id, count(*) cnt from tickets where status='waiting' group by contact_id;

Таблица повторных ключей для объединения

CREATE TABLE T_WAITING_REKEYED with WITH (KAFKA_TOPIC='WAITING_TICKETS_BY_CUST',
       VALUE_FORMAT='AVRO',
       KEY='contact_id');

Слева (внешнее) соединение этой таблицы с нашей таблицей клиентов дает нам всех клиентов, у которых есть ожидающие билеты.

select c.id,w.cnt wcnt from T_WAITING_REKEYED w left join CRM_CONTACTS c on w.contact_id=c.id;

Но нам нужны все клиенты с числом ожидания NULLED, чтобы использовать этот результат в другом соединении с билетами встатус ОБРАБОТКА.Поскольку у нас есть только клиенты с ожиданием, мы получаем только тех, у кого есть значения для обоих состояний.

ksql> select c.*,t.cnt from T_PROCESSING_REKEYED t left join cust_ticket_tmp1 c on t.contact_id=c.id;
null | null | null | null | 1
1555261086669 | 1472 | 1472 | 0 | 1
1555261086669 | 1472 | 1472 | 0 | 1
null | null | null | null | 1
1555064371937 | 1474 | 1474 | 1 | 1
null | null | null | null | 1
1555064371937 | 1474 | 1474 | 1 | 1
null | null | null | null | 1
null | null | null | null | 1
null | null | null | null | 1
1555064372018 | 3 | 3 | 5 | 6
1555064372018 | 3 | 3 | 5 | 6

Так каков правильный подход к этому?

Это KSQL 5.2.1

Спасибо

Редактировать:

Вот некоторые примеры данных

Создана ТЕМА, которая ограничиваетданные для тест-аккаунта

CREATE STREAM tickets_filtered
  WITH (
        PARTITIONS=12,
        VALUE_FORMAT='JSON') AS
  SELECT id,
         contact_id,
subject,
status,

         TIMESTAMPTOSTRING(changetime, 'yyyy-MM-dd HH:mm:ss.SSS') AS timestring
  FROM tickets where contact_id=1472
  PARTITION BY contact_id;

00:06:44 1 $ kafkacat-dev -C -o beginning -t TICKETS_FILTERED
{"ID":2216,"CONTACT_ID":1472,"SUBJECT":"Test Bodenbach","STATUS":"closed","TIMESTRING":"2012-11-08 10:34:30.000"}
{"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-16 23:07:01.000"}
{"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"processing","TIMESTRING":"2019-04-16 23:52:08.000"}
Changing and adding something in the ticketing-system...
{"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-17 00:10:38.000"}
{"ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"new","TIMESTRING":"2019-04-17 00:11:23.000"}
{"ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"close-request","TIMESTRING":"2019-04-17 00:12:04.000"}

Мы хотим создать тему из этих данных, где сообщения будут выглядеть следующим образом

{"CONTACT_ID":1472,"TICKETS_CLOSED":1,"TICKET_WAITING":1,"TICKET_CLOSEREQUEST":1,"TICKET_PROCESSING":0}

1 Ответ

0 голосов
/ 17 апреля 2019

( здесь тоже написано )

Это можно сделать, построив таблицу (для состояния), а затем агрегируя по этой таблице.

  1. Настройка тестовых данных

    kafkacat -b localhost -t tickets -P <<EOF
    {"ID":2216,"CONTACT_ID":1472,"SUBJECT":"Test Bodenbach","STATUS":"closed","TIMESTRING":"2012-11-08 10:34:30.000"}
    {"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-16 23:07:01.000"}
    {"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"processing","TIMESTRING":"2019-04-16 23:52:08.000"}
    {"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-17 00:10:38.000"}
    {"ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"new","TIMESTRING":"2019-04-17 00:11:23.000"}
    {"ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"close-request","TIMESTRING":"2019-04-17 00:12:04.000"}
    EOF
    
  2. Предварительный просмотр данных темы

    ksql> PRINT 'tickets' FROM BEGINNING;
    Format:JSON
    {"ROWTIME":1555511270573,"ROWKEY":"null","ID":2216,"CONTACT_ID":1472,"SUBJECT":"Test Bodenbach","STATUS":"closed","TIMESTRING":"2012-11-08 10:34:30.000"}
    {"ROWTIME":1555511270573,"ROWKEY":"null","ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-16 23:07:01.000"}
    {"ROWTIME":1555511270573,"ROWKEY":"null","ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"processing","TIMESTRING":"2019-04-16 23:52:08.000"}
    {"ROWTIME":1555511270573,"ROWKEY":"null","ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-17 00:10:38.000"}
    {"ROWTIME":1555511270573,"ROWKEY":"null","ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"new","TIMESTRING":"2019-04-17 00:11:23.000"}
    {"ROWTIME":1555511270573,"ROWKEY":"null","ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"close-request","TIMESTRING":"2019-04-17 00:12:04.000"}
    
  3. Зарегистрировать поток

    CREATE STREAM TICKETS (ID INT, 
                          CONTACT_ID VARCHAR, 
                          SUBJECT VARCHAR, 
                          STATUS VARCHAR, 
                          TIMESTRING VARCHAR) 
            WITH (KAFKA_TOPIC='tickets', 
            VALUE_FORMAT='JSON');
    
  4. Запрос данных

    ksql> SET 'auto.offset.reset' = 'earliest';
    ksql> SELECT * FROM TICKETS;
    1555502643806 | null | 2216 | 1472 | Test Bodenbach | closed | 2012-11-08 10:34:30.000
    1555502643806 | null | 8945 | 1472 | sync-test | waiting | 2019-04-16 23:07:01.000
    1555502643806 | null | 8945 | 1472 | sync-test | processing | 2019-04-16 23:52:08.000
    1555502643806 | null | 8945 | 1472 | sync-test | waiting | 2019-04-17 00:10:38.000
    1555502643806 | null | 8952 | 1472 | another sync ticket | new | 2019-04-17 00:11:23.000
    1555502643806 | null | 8952 | 1472 | another sync ticket | close-request | 2019-04-17 00:12:04.000
    
  5. На данный момент мы можем использовать CASE для поворота агрегатов:

    SELECT CONTACT_ID, 
          SUM(CASE WHEN STATUS='new' THEN 1 ELSE 0 END) AS TICKETS_NEW, 
          SUM(CASE WHEN STATUS='processing' THEN 1 ELSE 0 END) AS TICKETS_PROCESSING, 
          SUM(CASE WHEN STATUS='waiting' THEN 1 ELSE 0 END) AS TICKETS_WAITING, 
          SUM(CASE WHEN STATUS='close-request' THEN 1 ELSE 0 END) AS TICKETS_CLOSEREQUEST ,
          SUM(CASE WHEN STATUS='closed' THEN 1 ELSE 0 END) AS TICKETS_CLOSED
      FROM TICKETS 
      GROUP BY CONTACT_ID;
    
      1472 | 1 | 1 | 2 | 1 | 1
    

    Но вы заметите, что ответ не такой, как ожидалось. Это потому, что мы считаем все шесть входных событий .

    Давайте посмотрим на один тикет, ID 8945 - он проходит три изменения состояния (waiting -> processing -> waiting), каждое из которых включается в совокупность. Мы можем проверить это следующим образом с помощью простого предиката:

    SELECT CONTACT_ID, 
          SUM(CASE WHEN STATUS='new' THEN 1 ELSE 0 END) AS TICKETS_NEW, 
          SUM(CASE WHEN STATUS='processing' THEN 1 ELSE 0 END) AS TICKETS_PROCESSING, 
          SUM(CASE WHEN STATUS='waiting' THEN 1 ELSE 0 END) AS TICKETS_WAITING, 
          SUM(CASE WHEN STATUS='close-request' THEN 1 ELSE 0 END) AS TICKETS_CLOSEREQUEST ,
          SUM(CASE WHEN STATUS='closed' THEN 1 ELSE 0 END) AS TICKETS_CLOSED
      FROM TICKETS 
      WHERE ID=8945
      GROUP BY CONTACT_ID;
    
    1472 | 0 | 1 | 2 | 0 | 0
    
  6. На самом деле нам нужно текущее состояние для каждого билета. Так что переделите данные на тикет ID:

    CREATE STREAM TICKETS_BY_ID AS SELECT * FROM TICKETS PARTITION BY ID;
    
    CREATE TABLE TICKETS_TABLE (ID INT, 
                          CONTACT_ID INT, 
                          SUBJECT VARCHAR, 
                          STATUS VARCHAR, 
                          TIMESTRING VARCHAR) 
            WITH (KAFKA_TOPIC='TICKETS_BY_ID', 
            VALUE_FORMAT='JSON',
            KEY='ID');
    
  7. Сравнить поток событий против текущее состояние

    • Поток событий (KSQL Stream)

      ksql> SELECT ID, TIMESTRING, STATUS FROM TICKETS;
      2216 | 2012-11-08 10:34:30.000 | closed
      8945 | 2019-04-16 23:07:01.000 | waiting
      8945 | 2019-04-16 23:52:08.000 | processing
      8945 | 2019-04-17 00:10:38.000 | waiting
      8952 | 2019-04-17 00:11:23.000 | new
      8952 | 2019-04-17 00:12:04.000 | close-request
      
    • Текущее состояние (таблица KSQL)

      ksql> SELECT ID, TIMESTRING, STATUS FROM TICKETS_TABLE;
      2216 | 2012-11-08 10:34:30.000 | closed
      8945 | 2019-04-17 00:10:38.000 | waiting
      8952 | 2019-04-17 00:12:04.000 | close-request
      
  8. Нам нужен агрегат таблицы - мы хотим выполнить тот же трюк SUM(CASE…)…GROUP BY, который мы делали выше, но на основе текущего состояния каждого тикета, а не каждого события:

      SELECT CONTACT_ID, 
          SUM(CASE WHEN STATUS='new' THEN 1 ELSE 0 END) AS TICKETS_NEW, 
          SUM(CASE WHEN STATUS='processing' THEN 1 ELSE 0 END) AS TICKETS_PROCESSING, 
          SUM(CASE WHEN STATUS='waiting' THEN 1 ELSE 0 END) AS TICKETS_WAITING, 
          SUM(CASE WHEN STATUS='close-request' THEN 1 ELSE 0 END) AS TICKETS_CLOSEREQUEST ,
          SUM(CASE WHEN STATUS='closed' THEN 1 ELSE 0 END) AS TICKETS_CLOSED
      FROM TICKETS_TABLE 
      GROUP BY CONTACT_ID;
    

    Это дает нам то, что мы хотим:

      1472 | 0 | 0 | 1 | 1 | 1
    
  9. Давайте добавим в тему события другого билета и посмотрим, как меняется состояние таблицы. Строки из таблицы переиздаются при изменении состояния; Вы также можете отменить SELECT и запустить его снова, чтобы увидеть только текущее состояние.

    Примеры данных, чтобы попробовать сами:

    {"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"new","TIMESTRING":"2019-04-16 23:07:01.000"}
    {"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"processing","TIMESTRING":"2019-04-16 23:07:01.000"}
    {"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"waiting","TIMESTRING":"2019-04-16 23:07:01.000"}
    {"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"processing","TIMESTRING":"2019-04-16 23:07:01.000"}
    {"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"waiting","TIMESTRING":"2019-04-16 23:07:01.000"}
    {"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"closed","TIMESTRING":"2019-04-16 23:07:01.000"}
    {"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"close-request","TIMESTRING":"2019-04-16 23:07:01.000"}
    

Если вы хотите попробовать это дальше, вы можете сгенерировать поток дополнительных фиктивных данных с помощью Mockaroo , переданного через awk, чтобы замедлить его, чтобы вы могли увидеть влияние на сгенерированные агрегаты по мере поступления каждого сообщения:

while [ 1 -eq 1 ]
  do curl -s "https://api.mockaroo.com/api/f2d6c8a0?count=1000&key=ff7856d0" | \
      awk '{print $0;system("sleep 2");}' | \
      kafkacat -b localhost -t tickets -P
  done
...