Обработка многоступенчатого запроса в ksqlDB - PullRequest
0 голосов
/ 17 июня 2020

Предположим, существует поток многоступенчатых запросов для документов с заданными идентификаторами, которые необходимо извлечь из источника, с корректировкой контраста, распознаванием текста, подсчетом слов, помещением в каталог назначения и т. Д. c. (точные этапы могут варьироваться от запроса к запросу). Этапы запроса должны выполняться по порядку, а промежуточные результаты в кеше должны использоваться повторно (не хотите, чтобы один и тот же документ повторялся снова и снова). Я хочу создать поток запросов для отдельных приложений (приложение для поиска документов, приложение OCR и т. Д. c.). Как я могу это сделать с помощью ksqlDB?

1 Ответ

0 голосов
/ 17 июня 2020

Представим и запросы, и продукты в виде массивов шагов, необходимых для их создания. В худшем случае предположим, что каждый шаг может содержать настраиваемую информацию, поэтому представление должно иметь тип ARRAY<MAP<STRING,STRING>>. Использование массива в качестве ключа в настоящее время не поддерживается в ksqlDB, но мы можем «обмануть», используя преобразование массива в виде строки в качестве ключа.

Давайте сначала разберем logi c: - Любой новый запрос должен запускать запросы любых отсутствующих предварительных условий. - Любой новый запрос, соответствующий существующему продукту, следует игнорировать. - Любой новый запрос без соответствующего продукта, но с подходящим обязательным продуктом должен быть перенаправлен в поток задач приложения. - Любой новый запрос, в котором нет ни подходящего продукта, ни соответствующего необходимого продукта, должен инициировать создание нового запроса для необходимого продукта. - Любой новый продукт, который является предпосылкой для запроса, должен инициировать пересылку запроса в поток задач приложения.

Сначала давайте объединим наши потоки запросов и таблицы.

CREATE STREAM requests (stages ARRAY<MAP<STRING,STRING>>) WITH (kafka_topic='requests', value_format='json', partitions=1);

CREATE STREAM keyed_requests WITH (kafka_topic='keyed_requests', value_format='json', partitions=1) AS
SELECT
  CAST(stages AS STRING) AS request_id,
  CAST(SLICE(stages,1,ARRAY_LENGTH(stages)-1) AS STRING) AS prereq_id,
  stages,
  SLICE(stages,1,ARRAY_LENGTH(stages)-1) AS prereq
FROM requests
PARTITION BY CAST(stages AS STRING);

CREATE TABLE tbl_requests (ROWKEY STRING PRIMARY KEY, request_id STRING, prereq_id STRING, stages ARRAY<MAP<STRING,STRING>>, prereq ARRAY<MAP<STRING,STRING>>) WITH (kafka_topic='keyed_requests', value_format='json');

CREATE TABLE tbl_requests_copy AS SELECT * FROM tbl_requests;

CREATE TABLE prereq_requests AS SELECT prereq_id, COLLECT_SET(request_id) AS requests FROM keyed_requests GROUP BY prereq_id;

INSERT INTO requests
SELECT
  keyed_requests.prereq AS stages
FROM keyed_requests
LEFT JOIN tbl_requests_copy ON keyed_requests.prereq_id = tbl_requests_copy.ROWKEY
WHERE tbl_requests_copy.ROWKEY IS NULL;

Далее создадим поток / таблицу завершенных продуктов:

CREATE STREAM products (ROWKEY STRING KEY) WITH (kafka_topic='products', value_format='json', partitions=1);
CREATE TABLE tbl_products (ROWKEY STRING PRIMARY KEY) WITH (kafka_topic='products', value_format='json');

Теперь создадим новый поток для запросов с выполненными предпосылками:

CREATE STREAM completed_prereq_requests (ROWKEY STRING KEY, request_id STRING) WITH (kafka_topic='completed_prereq_requests', value_format='json', partitions=1);

Давайте вставим новые запросы в поток :

INSERT INTO completed_prereq_requests
SELECT
  request_id
FROM keyed_requests
LEFT JOIN tbl_products ON keyed_requests.prereq_id = tbl_products.ROWKEY
WHERE keyed_requests.prereq_id = '[]' OR tbl_products.ROWKEY IS NOT NULL
EMIT CHANGES;

Давайте также вставим в поток завершенные продукты:

INSERT INTO completed_prereq_requests
SELECT
  EXPLODE(prereq_requests.requests) AS request_id
FROM products
INNER JOIN prereq_requests ON products.ROWKEY = prereq_requests.ROWKEY
EMIT CHANGES;

Теперь у нас есть поток идентификаторов запросов с выполненными предпосылками, но он по-прежнему включает запросы, которые уже были выполнены самих себя. Кроме того, сами массивы запросов не входят в этот поток. Отфильтруем выполненные запросы и пополним их объектами массива в новом потоке:

CREATE STREAM tasks AS
SELECT
  tbl_requests.stages AS stages
FROM completed_prereq_requests
LEFT JOIN tbl_requests ON completed_prereq_requests.request_id = tbl_requests.ROWKEY
LEFT JOIN tbl_products ON completed_prereq_requests.request_id = tbl_products.ROWKEY
WHERE tbl_products.ROWKEY IS NULL
PARTITION BY completed_prereq_requests.request_id;

Чтобы проверить это, выберите поток задач в одном окне ksqlDB с SELECT * FROM tasks EMIT CHANGES;. Затем откройте другое окно ksqlDB и создайте запрос и начните выпускать продукты. Посмотрите, что происходит на другом экране.

INSERT INTO requests (stages) VALUES (ARRAY[MAP('p':='x'),MAP('r':='y')]);
INSERT INTO products (ROWKEY) VALUES (CAST(ARRAY[MAP('p':='x')] AS STRING));
INSERT INTO requests (stages) VALUES (ARRAY[MAP('p':='x'),MAP('r':='y2')]);
INSERT INTO requests (stages) VALUES (ARRAY[MAP('p1':='x1','p2':='x2'),MAP('r':='y'),MAP('n':='z')]);
INSERT INTO products (ROWKEY) VALUES (CAST(ARRAY[MAP('p1':='x1','p2':='x2')] AS STRING));
INSERT INTO requests (stages) VALUES (ARRAY[MAP('p1':='x1','p2':='x2'),MAP('r':='y'),MAP('n':='z2')]);
INSERT INTO products (ROWKEY) VALUES (CAST(ARRAY[MAP('p1':='x1','p2':='x2'),MAP('r':='y')] AS STRING));
...