Я делаю PoC, используя dotnet и ksql.https://github.com/pablocastilla/kafkiano/
Общая идея состоит в том, чтобы посмотреть, смогу ли я реализовать бизнес-логику с использованием KSQL.В примере я представляю устройства на складе и делаю заказы из него.Пример состоит в следующем:
Два основных потока:
- Поток инвентаря получает событие добавления в инвентарь.
- Поток заказов получает заказы продуктов.
С этими потоками я создаю две таблицы:
- ProductStock: просто добавляет товары на склад
- Заказы: подсчитывает заказы по продукту
После этих двух таблиц я создаю еще одну таблицу с разницей между заказами и продуктами в инвентаре, просто чтобы узнать, остались ли продукты.
С объединением в этой последней таблице иПоток заказов У меня может быть запас при обработке этого заказа.
Я представляю события, используя название продукта в качестве ключа.Пока это хорошо работает на моей машине, но мои вопросы:
Соответствует ли это в большой производственной среде?Я хотел бы знать ограничения о том, когда согласованность нарушается, когда много событий принимается параллельно.
Как узнать, какие запросы выполняются раньше других?Мне нужно посчитать разницу между запасами и заказами, прежде чем я соединю эту разницу с потоком заказов
Спасибо
KSQL:
//INVENTORY STREAMS
CREATE STREAM InventoryEventsStream (ProductName VARCHAR, Quantity INT) WITH (kafka_topic='INVENTORYEVENTS', key='ProductName', value_format='json');
//TABLE GROUPING BY PRODUCT
CREATE TABLE ProductsStock as select ProductName,sum(Quantity) as Stock from InventoryEventsStream group by ProductName;
// ORDERS STREAMS
CREATE STREAM OrdersCreatedStream (ProductName VARCHAR,Quantity INT, OrderId VARCHAR, User VARCHAR) WITH (kafka_topic='ORDERSEVENTS', key='ProductName', value_format='json');
//TABLE GROUPING BY PRODUCT
CREATE TABLE ProductsOrdered as select ProductName as ProductName,sum(Quantity) as Orders from ORDERSCREATEDSTREAM group by ProductName;
// join with the difference
CREATE TABLE StockByProductTable AS SELECT ps.ProductName as ProductName,ps.Stock - op.Orders as Stock FROM PRODUCTSORDERED op JOIN ProductsStock ps ON op.ProductName = ps.ProductName;
//logic: I want the stock left when I make an order
SELECT ocs.OrderId,ocs.User,sbpt.Stock FROM OrdersCreatedStream ocs JOIN StockByProductTable sbpt ON sbpt.ProductName = ocs.ProductName;