KSQL Query, возвращающий неожиданные значения в простой агрегации - PullRequest
1 голос
/ 13 октября 2019

Я получаю неожиданные результаты от запроса KSQL к KTable, который сам определяется темой Kafka. KTABLE - это «Сделки», и он поддерживается сжатой темой «localhost.dbo.TradeHistory». Предполагается, что он содержит самую последнюю информацию о сделке с акциями, имеющей идентификатор TradeId. Ключ темы - TradeId. Каждая сделка имеет AccountId, и я пытаюсь создать запрос для получения суммы сумм сделок, сгруппированных по счетам.

Определение сделок KTABLE

ksql> create table Trades(TradeId int, AccountId int, Spn int, Amount double) with (KAFKA_TOPIC = 'localhost.dbo.TradeHistory', VALUE_FORMAT = 'JSON', KEY = 'TradeId');

...

ksql> describe extended Trades;

Name                 : TRADES
Type                 : TABLE
Key field            : TRADEID
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : localhost.dbo.TradeHistory (partitions: 1, replication: 1)

Field     | Type
---------------------------------------
ROWTIME   | BIGINT           (system)
ROWKEY    | VARCHAR(STRING)  (system)
TRADEID   | INTEGER
ACCOUNTID | INTEGER
SPN       | INTEGER
AMOUNT    | DOUBLE
---------------------------------------

Local runtime statistics
------------------------
consumer-messages-per-sec:         0 consumer-total-bytes:      3709 consumer-total-messages:        39     last-message: 2019-10-12T20:52:16.552Z

(Statistics of the local KSQL server interaction with the Kafka topic localhost.dbo.TradeHistory)

Конфигурация темы localhost.dbo.TradeHistory

/usr/bin/kafka-topics --zookeeper zookeeper:2181 --describe --topic localhost.dbo.TradeHistory
Topic:localhost.dbo.TradeHistory    PartitionCount:1    ReplicationFactor:1 Configs:min.cleanable.dirty.ratio=0.01,delete.retention.ms=100,cleanup.policy=compact,segment.ms=100
    Topic: localhost.dbo.TradeHistory   Partition: 0    Leader: 1   Replicas: 1 Isr: 1

В моем тесте я добавляю сообщения в тему localhost.dbo.TradeHistory с TradeId 2, которые просто меняют сумму сделки. Только сумма обновляется;Идентификатор учетной записи остается равным 1.

Сообщения в теме localhost.dbo.TradeHistory

/usr/bin/kafka-console-consumer --bootstrap-server broker:9092 --property print.key=true --topic localhost.dbo.TradeHistory --from-beginning

... (earlier values redacted) ...

2   {"TradeHistoryId":47,"TradeId":2,"AccountId":1,"Spn":1,"Amount":106.0,"__table":"TradeHistory"}
2   {"TradeHistoryId":48,"TradeId":2,"AccountId":1,"Spn":1,"Amount":107.0,"__table":"TradeHistory"}

В приведенном выше дампе темы показано изменение суммы сделки 2 (в учетной записи 1)от 106,0 до 107,0.

Запрос KSQL

ksql> select AccountId, count(*) as Count, sum(Amount) as Total from Trades group by AccountId;
1 | 1 | 106.0
1 | 0 | 0.0
1 | 1 | 107.0

Вопрос в том, почему приведенный выше запрос KSQL возвращает «промежуточное» значение каждый раз, когда я публикую обновление сделки. Как видите, поля Count и Amount показывают 0,0, а затем запрос KSQL немедленно «исправляет» его до 1,107.0. Я немного сбит с толку этим поведением.

Может кто-нибудь объяснить это?

Большое спасибо.

1 Ответ

5 голосов
/ 16 октября 2019

Спасибо за ваш вопрос. Я добавил ответ в нашу базу знаний: https://github.com/confluentinc/ksql/pull/3594/files.

Когда KSQL видит обновление существующей строки в таблице, он внутренне генерирует событие CDC, которое содержит старое и новое значение. Агрегаты обрабатывают это, сначала отменяя старое значение, а затем применяя новое значение.

Итак, в приведенном выше примере, когда происходит вторая вставка, KSQL сначала отменяет старое значение. В результате значение COUNT уменьшается на 1, а значение SUM уменьшается на старое значение 106.0, то есть уменьшается до нуля. Затем KSQL применяет новое значение строки, в котором COUNT увеличивается на 1, а SUM увеличивается на новое значение 107.0.

По умолчанию KSQL настроен для буферизации результатов для до 2 секунд или 10 МБ данных перед отправкой результатов в Kafka. Вот почему вы можете увидеть небольшую задержку на выходе при вставке значений в этом примере. Если обе выходные строки буферизуются вместе, то KSQL подавит первый результат. Вот почему вы часто не видите вывод промежуточной строки. Конфигурации commit.interval.ms и cache.max.bytes.buffering, которые установлены на 2 секунды и 10 МБ соответственно, могут использоваться для настройки этого поведения. Установка любого из этих параметров в ноль приведет к тому, что KSQL будет всегда выводить все промежуточные результаты.

Если вы видите, что эти промежуточные результаты выводятся каждый раз, то, скорее всего, вы установили один или оба этих параметра в ноль.

У нас есть проблема Github , чтобы улучшить KSQL, чтобы использовать функциональность подавления Kafka Stream, которая позволила бы пользователям лучше контролировать, как материализуются результаты.

...