Как правильно реализовать скользящую среднюю с помощью KSQL KTable? - PullRequest
0 голосов
/ 16 июня 2019

Я пытаюсь внедрить скользящее среднее по объему в KSQL.

Кафка в настоящее время принимает данные от производителя в тему "KLINES".Эти данные представлены на нескольких рынках в едином формате.Затем я создаю поток из этих данных следующим образом:

CREATE STREAM KLINESTREAM (market VARCHAR, open DOUBLE, high DOUBLE, low DOUBLE, close DOUBLE, volume DOUBLE, start_time BIGINT, close_time BIGINT, event_time BIGINT) \
WITH (VALUE_FORMAT='JSON', KAFKA_TOPIC='KLINES', TIMESTAMP='event_time', KEY='market');

Затем я создаю таблицу, которая вычисляет средний объем за последние 20 минут для каждого рынка следующим образом:

CREATE TABLE AVERAGE_VOLUME_TABLE_BY_MARKET AS \
SELECT CEIL(SUM(volume) / COUNT(*)) AS volume_avg, market FROM KLINESTREAM \
WINDOW HOPPING (SIZE 20 MINUTES, ADVANCE BY 5 SECONDS) \ 
GROUP BY market;
SELECT * FROM AVERAGE_VOLUME_TABLE_BY_MARKET LIMIT 1;

Для ясности выдает:

1560647412620 | EXAMPLEMARKET : Window{start=1560647410000 end=-} | 44.0 | EXAMPLEMARKET

Я хочу иметь таблицу KSQL, которая будет представлять текущее состояние "kline" каждого рынка, а также включать скользящий средний объем, рассчитанный в«AVERAGE_VOLUME_TABLE_BY_MARKET» KTable, чтобы я мог выполнить анализ между текущим объемом и средним скользящим объемом

Я попытался присоединиться так:

SELECT K.market, K.open, K.high, K.low, K.close, K.volume, V.volume_avg \
FROM KLINESTREAM K \
LEFT JOIN AVERAGE_VOLUME_TABLE_BY_MARKET V \
ON K.market = V.market;

Но, очевидно, это приводит к ошибкепоскольку ключ «AVERAGE_VOLUME_TABLE_BY_MARKET» включает в себя TimeWindow, а также рынок.

A serializer (key:
    org.apache.kafka.streams.kstream.TimeWindowedSerializer) is not compatible to
    the actual key type (key type: java.lang.String). Change the default Serdes in
    StreamConfig or provide correct Serdes via method parameters.

Правильно ли я подхожу к этой проблеме?

Чего я хочу достичь:

Windowed Aggregate KTable + Kline Stream -> 
KTable representing current market state 
including average volume from the KTable

, который отображает текущее состояние рынка, возможное в KSQL.Или я должен использовать KStreams или другую библиотеку для достижения этой цели?

Здесь приведен отличный пример агрегирования: https://www.confluent.io/stream-processing-cookbook/ksql-recipes/aggregating-data

Применимо к этому примеру, как мне использовать агрегат для сравнения со свежими даннымикак оно появляется в таблице KSQL?

Приветствия, Джеймс

...