Представим и запросы, и продукты в виде массивов шагов, необходимых для их создания. В худшем случае предположим, что каждый шаг может содержать настраиваемую информацию, поэтому представление должно иметь тип ARRAY<MAP<STRING,STRING>>
. Использование массива в качестве ключа в настоящее время не поддерживается в ksqlDB, но мы можем «обмануть», используя преобразование массива в виде строки в качестве ключа.
Давайте сначала разберем logi c: - Любой новый запрос должен запускать запросы любых отсутствующих предварительных условий. - Любой новый запрос, соответствующий существующему продукту, следует игнорировать. - Любой новый запрос без соответствующего продукта, но с подходящим обязательным продуктом должен быть перенаправлен в поток задач приложения. - Любой новый запрос, в котором нет ни подходящего продукта, ни соответствующего необходимого продукта, должен инициировать создание нового запроса для необходимого продукта. - Любой новый продукт, который является предпосылкой для запроса, должен инициировать пересылку запроса в поток задач приложения.
Сначала давайте объединим наши потоки запросов и таблицы.
CREATE STREAM requests (stages ARRAY<MAP<STRING,STRING>>) WITH (kafka_topic='requests', value_format='json', partitions=1);
CREATE STREAM keyed_requests WITH (kafka_topic='keyed_requests', value_format='json', partitions=1) AS
SELECT
CAST(stages AS STRING) AS request_id,
CAST(SLICE(stages,1,ARRAY_LENGTH(stages)-1) AS STRING) AS prereq_id,
stages,
SLICE(stages,1,ARRAY_LENGTH(stages)-1) AS prereq
FROM requests
PARTITION BY CAST(stages AS STRING);
CREATE TABLE tbl_requests (ROWKEY STRING PRIMARY KEY, request_id STRING, prereq_id STRING, stages ARRAY<MAP<STRING,STRING>>, prereq ARRAY<MAP<STRING,STRING>>) WITH (kafka_topic='keyed_requests', value_format='json');
CREATE TABLE tbl_requests_copy AS SELECT * FROM tbl_requests;
CREATE TABLE prereq_requests AS SELECT prereq_id, COLLECT_SET(request_id) AS requests FROM keyed_requests GROUP BY prereq_id;
INSERT INTO requests
SELECT
keyed_requests.prereq AS stages
FROM keyed_requests
LEFT JOIN tbl_requests_copy ON keyed_requests.prereq_id = tbl_requests_copy.ROWKEY
WHERE tbl_requests_copy.ROWKEY IS NULL;
Далее создадим поток / таблицу завершенных продуктов:
CREATE STREAM products (ROWKEY STRING KEY) WITH (kafka_topic='products', value_format='json', partitions=1);
CREATE TABLE tbl_products (ROWKEY STRING PRIMARY KEY) WITH (kafka_topic='products', value_format='json');
Теперь создадим новый поток для запросов с выполненными предпосылками:
CREATE STREAM completed_prereq_requests (ROWKEY STRING KEY, request_id STRING) WITH (kafka_topic='completed_prereq_requests', value_format='json', partitions=1);
Давайте вставим новые запросы в поток :
INSERT INTO completed_prereq_requests
SELECT
request_id
FROM keyed_requests
LEFT JOIN tbl_products ON keyed_requests.prereq_id = tbl_products.ROWKEY
WHERE keyed_requests.prereq_id = '[]' OR tbl_products.ROWKEY IS NOT NULL
EMIT CHANGES;
Давайте также вставим в поток завершенные продукты:
INSERT INTO completed_prereq_requests
SELECT
EXPLODE(prereq_requests.requests) AS request_id
FROM products
INNER JOIN prereq_requests ON products.ROWKEY = prereq_requests.ROWKEY
EMIT CHANGES;
Теперь у нас есть поток идентификаторов запросов с выполненными предпосылками, но он по-прежнему включает запросы, которые уже были выполнены самих себя. Кроме того, сами массивы запросов не входят в этот поток. Отфильтруем выполненные запросы и пополним их объектами массива в новом потоке:
CREATE STREAM tasks AS
SELECT
tbl_requests.stages AS stages
FROM completed_prereq_requests
LEFT JOIN tbl_requests ON completed_prereq_requests.request_id = tbl_requests.ROWKEY
LEFT JOIN tbl_products ON completed_prereq_requests.request_id = tbl_products.ROWKEY
WHERE tbl_products.ROWKEY IS NULL
PARTITION BY completed_prereq_requests.request_id;
Чтобы проверить это, выберите поток задач в одном окне ksqlDB с SELECT * FROM tasks EMIT CHANGES;
. Затем откройте другое окно ksqlDB и создайте запрос и начните выпускать продукты. Посмотрите, что происходит на другом экране.
INSERT INTO requests (stages) VALUES (ARRAY[MAP('p':='x'),MAP('r':='y')]);
INSERT INTO products (ROWKEY) VALUES (CAST(ARRAY[MAP('p':='x')] AS STRING));
INSERT INTO requests (stages) VALUES (ARRAY[MAP('p':='x'),MAP('r':='y2')]);
INSERT INTO requests (stages) VALUES (ARRAY[MAP('p1':='x1','p2':='x2'),MAP('r':='y'),MAP('n':='z')]);
INSERT INTO products (ROWKEY) VALUES (CAST(ARRAY[MAP('p1':='x1','p2':='x2')] AS STRING));
INSERT INTO requests (stages) VALUES (ARRAY[MAP('p1':='x1','p2':='x2'),MAP('r':='y'),MAP('n':='z2')]);
INSERT INTO products (ROWKEY) VALUES (CAST(ARRAY[MAP('p1':='x1','p2':='x2'),MAP('r':='y')] AS STRING));