Я получаю неожиданные результаты от запроса KSQL к KTable, который сам определяется темой Kafka. KTABLE - это «Сделки», и он поддерживается сжатой темой «localhost.dbo.TradeHistory». Предполагается, что он содержит самую последнюю информацию о сделке с акциями, имеющей идентификатор TradeId. Ключ темы - TradeId. Каждая сделка имеет AccountId, и я пытаюсь создать запрос для получения суммы сумм сделок, сгруппированных по счетам.
Определение сделок KTABLE
ksql> create table Trades(TradeId int, AccountId int, Spn int, Amount double) with (KAFKA_TOPIC = 'localhost.dbo.TradeHistory', VALUE_FORMAT = 'JSON', KEY = 'TradeId');
...
ksql> describe extended Trades;
Name : TRADES
Type : TABLE
Key field : TRADEID
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : JSON
Kafka topic : localhost.dbo.TradeHistory (partitions: 1, replication: 1)
Field | Type
---------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
TRADEID | INTEGER
ACCOUNTID | INTEGER
SPN | INTEGER
AMOUNT | DOUBLE
---------------------------------------
Local runtime statistics
------------------------
consumer-messages-per-sec: 0 consumer-total-bytes: 3709 consumer-total-messages: 39 last-message: 2019-10-12T20:52:16.552Z
(Statistics of the local KSQL server interaction with the Kafka topic localhost.dbo.TradeHistory)
Конфигурация темы localhost.dbo.TradeHistory
/usr/bin/kafka-topics --zookeeper zookeeper:2181 --describe --topic localhost.dbo.TradeHistory
Topic:localhost.dbo.TradeHistory PartitionCount:1 ReplicationFactor:1 Configs:min.cleanable.dirty.ratio=0.01,delete.retention.ms=100,cleanup.policy=compact,segment.ms=100
Topic: localhost.dbo.TradeHistory Partition: 0 Leader: 1 Replicas: 1 Isr: 1
В моем тесте я добавляю сообщения в тему localhost.dbo.TradeHistory с TradeId 2, которые просто меняют сумму сделки. Только сумма обновляется;Идентификатор учетной записи остается равным 1.
Сообщения в теме localhost.dbo.TradeHistory
/usr/bin/kafka-console-consumer --bootstrap-server broker:9092 --property print.key=true --topic localhost.dbo.TradeHistory --from-beginning
... (earlier values redacted) ...
2 {"TradeHistoryId":47,"TradeId":2,"AccountId":1,"Spn":1,"Amount":106.0,"__table":"TradeHistory"}
2 {"TradeHistoryId":48,"TradeId":2,"AccountId":1,"Spn":1,"Amount":107.0,"__table":"TradeHistory"}
В приведенном выше дампе темы показано изменение суммы сделки 2 (в учетной записи 1)от 106,0 до 107,0.
Запрос KSQL
ksql> select AccountId, count(*) as Count, sum(Amount) as Total from Trades group by AccountId;
1 | 1 | 106.0
1 | 0 | 0.0
1 | 1 | 107.0
Вопрос в том, почему приведенный выше запрос KSQL возвращает «промежуточное» значение каждый раз, когда я публикую обновление сделки. Как видите, поля Count и Amount показывают 0,0, а затем запрос KSQL немедленно «исправляет» его до 1,107.0. Я немного сбит с толку этим поведением.
Может кто-нибудь объяснить это?
Большое спасибо.