Для данных, которые передаются из нашей системы заявок, мы пытаемся добиться следующего:
Получите количество открытых заявок, сгруппированных по статусу и клиенту.Упрощенная схема выглядит следующим образом:
Field | Type
-------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ID | BIGINT
TICKET_ID | BIGINT
STATUS | VARCHAR(STRING)
TICKETCATEGORY_ID | BIGINT
SUBJECT | VARCHAR(STRING)
PRIORITY | VARCHAR(STRING)
STARTTIME | BIGINT
ENDTIME | BIGINT
CHANGETIME | BIGINT
REMINDTIME | BIGINT
DEADLINE | INTEGER
CONTACT_ID | BIGINT
. Мы хотим использовать эти данные, чтобы получить количество заявок с определенным статусом (открыт, ожидание, выполняется и т. Д.) Для каждого клиента.Эти данные относятся к одному сообщению в другой теме. Схема может выглядеть следующим образом:
Field | Type
-------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
CONTACT_ID | BIGINT
COUNT_OPEN | BIGINT
COUNT_WAITING | BIGINT
COUNT_CLOSED | BIGINT
Мы планируем использовать эти и другие данные для обогащения информации о клиентах и публикации расширенного набора данных во внешней системе (например,asticsearch)
Получить первую часть довольно просто - сгруппировать заявки по клиенту и статусу.
select contact_id,status count(*) cnt from tickets group by contact_id,status;
Но теперь мы застряли - мы получаем несколько строк / сообщений для каждого клиента,и мы просто не знаем, как преобразовать их в одно сообщение с ключом contact_id.
Мы пробовали соединения, но все наши попытки ни к чему не привели.
Пример
Создать таблицу для всех заявок со статусом «ожидание», сгруппированных по клиентам
create table waiting_tickets_by_cust with (partitions=12,value_format='AVRO')
as select contact_id, count(*) cnt from tickets where status='waiting' group by contact_id;
Таблица повторных ключей для объединения
CREATE TABLE T_WAITING_REKEYED with WITH (KAFKA_TOPIC='WAITING_TICKETS_BY_CUST',
VALUE_FORMAT='AVRO',
KEY='contact_id');
Слева (внешнее) соединение этой таблицы с нашей таблицей клиентов дает нам всех клиентов, у которых есть ожидающие билеты.
select c.id,w.cnt wcnt from T_WAITING_REKEYED w left join CRM_CONTACTS c on w.contact_id=c.id;
Но нам нужны все клиенты с числом ожидания NULLED, чтобы использовать этот результат в другом соединении с билетами встатус ОБРАБОТКА.Поскольку у нас есть только клиенты с ожиданием, мы получаем только тех, у кого есть значения для обоих состояний.
ksql> select c.*,t.cnt from T_PROCESSING_REKEYED t left join cust_ticket_tmp1 c on t.contact_id=c.id;
null | null | null | null | 1
1555261086669 | 1472 | 1472 | 0 | 1
1555261086669 | 1472 | 1472 | 0 | 1
null | null | null | null | 1
1555064371937 | 1474 | 1474 | 1 | 1
null | null | null | null | 1
1555064371937 | 1474 | 1474 | 1 | 1
null | null | null | null | 1
null | null | null | null | 1
null | null | null | null | 1
1555064372018 | 3 | 3 | 5 | 6
1555064372018 | 3 | 3 | 5 | 6
Так каков правильный подход к этому?
Это KSQL 5.2.1
Спасибо
Редактировать:
Вот некоторые примеры данных
Создана ТЕМА, которая ограничиваетданные для тест-аккаунта
CREATE STREAM tickets_filtered
WITH (
PARTITIONS=12,
VALUE_FORMAT='JSON') AS
SELECT id,
contact_id,
subject,
status,
TIMESTAMPTOSTRING(changetime, 'yyyy-MM-dd HH:mm:ss.SSS') AS timestring
FROM tickets where contact_id=1472
PARTITION BY contact_id;
00:06:44 1 $ kafkacat-dev -C -o beginning -t TICKETS_FILTERED
{"ID":2216,"CONTACT_ID":1472,"SUBJECT":"Test Bodenbach","STATUS":"closed","TIMESTRING":"2012-11-08 10:34:30.000"}
{"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-16 23:07:01.000"}
{"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"processing","TIMESTRING":"2019-04-16 23:52:08.000"}
Changing and adding something in the ticketing-system...
{"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-17 00:10:38.000"}
{"ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"new","TIMESTRING":"2019-04-17 00:11:23.000"}
{"ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"close-request","TIMESTRING":"2019-04-17 00:12:04.000"}
Мы хотим создать тему из этих данных, где сообщения будут выглядеть следующим образом
{"CONTACT_ID":1472,"TICKETS_CLOSED":1,"TICKET_WAITING":1,"TICKET_CLOSEREQUEST":1,"TICKET_PROCESSING":0}