У меня есть topi c, который заполняется разъемом JDB C. Кажется, у него нет ключа сообщения kafka:
ksql> print 'mssql-transaction-log' limit 3;
Format:AVRO
3/6/20 11:40:35 AM UTC, null, {"TransID": 8789405114, "UserID": 15, "ActionCode": 80, "GameName": "thisgame", "GameID": 148362, "DataCashRef": null, "Success": "Y", "StartBalance": 188036, "Amount": -25, "EndBalance": 188011, "BonusSta
rtBalance": 10000, "BonusAmount": 0, "BonusEndBalance": 10000, "Stamp": 1583162921467, "SiteID": 6}
Я создал поток из этого:
CREATE STREAM TRANSACTIONS_LOG_RAW
(
TRANSID BIGINT,
USERID INTEGER,
ACTIONCODE INTEGER,
GAMENAME STRING,
GAMEID BIGINT,
DATACASHREF STRING,
SUCCESS STRING,
STARTBALANCE INTEGER,
AMOUNT INTEGER,
ENDBALANCE INTEGER,
BONUSSTARTBALANCE INTEGER,
BONUSAMOUNT INTEGER,
BONUSENDBALANCE INTEGER,
STAMP BIGINT,
SITEID INTEGER
)
WITH (KAFKA_TOPIC='mssql-transaction-log',
VALUE_FORMAT='AVRO',
KEY='USERID');
Я создал отфильтрованный поток из этого:
CREATE STREAM GAME_PURCHASES_RAW AS
SELECT USERID,
GAMENAME,
AMOUNT,
STAMP,
TIMESTAMPTOSTRING(STAMP, 'yyyyMMddHH') HOUR_DIMENSION,
TIMESTAMPTOSTRING(STAMP, 'yyyyMMdd') DAY_DIMENSION
FROM TRANSACTIONS_LOG_RAW
WHERE ACTIONCODE = 80
PARTITION BY USERID;
Когда я проверяю эти сообщения, ключ kafka отсутствует:
ksql> print 'GAME_PURCHASES_RAW' limit 3;
Format:AVRO
3/6/20 11:40:35 AM UTC, null, {"USERID": 58, "GAMENAME": "game", "AMOUNT": -50, "STAMP": 1583162898780, "HOUR_DIMENSION": "2020030215", "DAY_DIMENSION": "20200302"}
3/6/20 11:40:35 AM UTC, null, {"USERID": 191, "GAMENAME": "game", "AMOUNT": -10, "STAMP": 1583162898780, "HOUR_DIMENSION": "2020030215", "DAY_DIMENSION": "20200302"}
3/6/20 11:40:35 AM UTC, null, {"USERID": 70, "GAMENAME": "game", "AMOUNT": -10, "STAMP": 1583162898980, "HOUR_DIMENSION": "2020030215", "DAY_DIMENSION": "20200302"}
Когда я описываю поток, он показывает ключ:
ksql> describe GAME_PURCHASES_RAW;
Name : GAME_PURCHASES_RAW
Field | Type
--------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
USERID | INTEGER (key)
GAMENAME | VARCHAR(STRING)
AMOUNT | INTEGER
STAMP | BIGINT
HOUR_DIMENSION | VARCHAR(STRING)
DAY_DIMENSION | VARCHAR(STRING)
--------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>
Я собираюсь создать совокупность из этой группы потоков GAME_PURCHASES_RAW по USERID. Я думал, что для агрегатов ключ сообщения kafka не может быть нулевым, потому что мне нужно хранить сообщения для каждого уникального USERID в одном и том же разделе.
Почему поток для GAME_PURCHASES_RAW не показывает ключ в сообщении kafka для topi c он создает?
Правильно ли я конфигурирую сообщение и k sql ключей, чтобы убедиться в правильности моего окончательного агрегирования?
(Я подозреваю, что мое понимание ключей kafka vs k sql на каком-то фундаментальном уровне не хватает ключей потока)