Согласованность KSQL - PullRequest
       7

Согласованность KSQL

0 голосов
/ 09 декабря 2018

Я делаю PoC, используя dotnet и ksql.https://github.com/pablocastilla/kafkiano/

Общая идея состоит в том, чтобы посмотреть, смогу ли я реализовать бизнес-логику с использованием KSQL.В примере я представляю устройства на складе и делаю заказы из него.Пример состоит в следующем:

Два основных потока:

  • Поток инвентаря получает событие добавления в инвентарь.
  • Поток заказов получает заказы продуктов.

С этими потоками я создаю две таблицы:

  • ProductStock: просто добавляет товары на склад
  • Заказы: подсчитывает заказы по продукту

После этих двух таблиц я создаю еще одну таблицу с разницей между заказами и продуктами в инвентаре, просто чтобы узнать, остались ли продукты.

С объединением в этой последней таблице иПоток заказов У меня может быть запас при обработке этого заказа.

Я представляю события, используя название продукта в качестве ключа.Пока это хорошо работает на моей машине, но мои вопросы:

  • Соответствует ли это в большой производственной среде?Я хотел бы знать ограничения о том, когда согласованность нарушается, когда много событий принимается параллельно.

  • Как узнать, какие запросы выполняются раньше других?Мне нужно посчитать разницу между запасами и заказами, прежде чем я соединю эту разницу с потоком заказов

Спасибо

KSQL:
//INVENTORY STREAMS
CREATE STREAM InventoryEventsStream (ProductName VARCHAR, Quantity INT) WITH (kafka_topic='INVENTORYEVENTS', key='ProductName', value_format='json');

//TABLE GROUPING BY PRODUCT
CREATE TABLE  ProductsStock as select ProductName,sum(Quantity) as Stock from InventoryEventsStream group by ProductName;

// ORDERS STREAMS
CREATE STREAM OrdersCreatedStream (ProductName VARCHAR,Quantity INT, OrderId VARCHAR, User VARCHAR) WITH (kafka_topic='ORDERSEVENTS', key='ProductName', value_format='json');
//TABLE GROUPING BY PRODUCT
CREATE TABLE ProductsOrdered as select ProductName as ProductName,sum(Quantity) as Orders from  ORDERSCREATEDSTREAM group by ProductName;

// join with the difference
CREATE TABLE StockByProductTable  AS  SELECT ps.ProductName as ProductName,ps.Stock - op.Orders as Stock FROM PRODUCTSORDERED op JOIN ProductsStock ps ON op.ProductName = ps.ProductName;

//logic: I want the stock left when I make an order
SELECT ocs.OrderId,ocs.User,sbpt.Stock FROM OrdersCreatedStream ocs JOIN  StockByProductTable sbpt ON sbpt.ProductName = ocs.ProductName;

1 Ответ

0 голосов
/ 12 декабря 2018

Я копирую и вставляю ответ github Я получил от команды слияния:

"Я получил ваш вопрос. Минимальный ответ на ваш вопрос - он выполняется, как только ваше сообщениедоступна в потоке.

Хорошей аналогией может быть механизм, который всегда работает. Всякий раз, когда полезная нагрузка входит внутрь, она просто обрабатывает ее. Теперь это касается цепочки. Вы вставляете некоторыеполезная нагрузка в новый поток записей после обработки? Тогда да, вы можете назвать это «цепочкой». Как только вы запустите / выполните операторы CTAS / CSAS, вы увидите что-то вроде «Таблица / Поток создан и запущен», это именно то, что он означает.

Вы подожгли постоянно работающий запрос! "

...