Правильно ли я конфигурирую сообщения и k sql ключей, чтобы убедиться в правильности моей окончательной агрегации? - PullRequest
1 голос
/ 06 марта 2020

У меня есть topi c, который заполняется разъемом JDB C. Кажется, у него нет ключа сообщения kafka:

ksql> print 'mssql-transaction-log' limit 3;
Format:AVRO
3/6/20 11:40:35 AM UTC, null, {"TransID": 8789405114, "UserID": 15, "ActionCode": 80, "GameName": "thisgame", "GameID": 148362, "DataCashRef": null, "Success": "Y", "StartBalance": 188036, "Amount": -25, "EndBalance": 188011, "BonusSta
rtBalance": 10000, "BonusAmount": 0, "BonusEndBalance": 10000, "Stamp": 1583162921467, "SiteID": 6}

Я создал поток из этого:

CREATE STREAM TRANSACTIONS_LOG_RAW
   (
      TRANSID BIGINT,
      USERID INTEGER,
      ACTIONCODE INTEGER,
      GAMENAME STRING,
      GAMEID BIGINT,
      DATACASHREF STRING,
      SUCCESS STRING,
      STARTBALANCE INTEGER,
      AMOUNT INTEGER,
      ENDBALANCE INTEGER,
      BONUSSTARTBALANCE INTEGER,
      BONUSAMOUNT INTEGER,
      BONUSENDBALANCE INTEGER,
      STAMP BIGINT,
      SITEID INTEGER
  )
  WITH (KAFKA_TOPIC='mssql-transaction-log',
    VALUE_FORMAT='AVRO',
    KEY='USERID');

Я создал отфильтрованный поток из этого:

CREATE STREAM GAME_PURCHASES_RAW AS
    SELECT USERID,
    GAMENAME,
    AMOUNT,
    STAMP,
    TIMESTAMPTOSTRING(STAMP, 'yyyyMMddHH') HOUR_DIMENSION,
    TIMESTAMPTOSTRING(STAMP, 'yyyyMMdd') DAY_DIMENSION
    FROM TRANSACTIONS_LOG_RAW
    WHERE ACTIONCODE = 80
    PARTITION BY USERID;

Когда я проверяю эти сообщения, ключ kafka отсутствует:

ksql> print 'GAME_PURCHASES_RAW' limit 3;
Format:AVRO
3/6/20 11:40:35 AM UTC, null, {"USERID": 58, "GAMENAME": "game", "AMOUNT": -50, "STAMP": 1583162898780, "HOUR_DIMENSION": "2020030215", "DAY_DIMENSION": "20200302"}
3/6/20 11:40:35 AM UTC, null, {"USERID": 191, "GAMENAME": "game", "AMOUNT": -10, "STAMP": 1583162898780, "HOUR_DIMENSION": "2020030215", "DAY_DIMENSION": "20200302"}
3/6/20 11:40:35 AM UTC, null, {"USERID": 70, "GAMENAME": "game", "AMOUNT": -10, "STAMP": 1583162898980, "HOUR_DIMENSION": "2020030215", "DAY_DIMENSION": "20200302"}

Когда я описываю поток, он показывает ключ:

ksql> describe GAME_PURCHASES_RAW;

Name                 : GAME_PURCHASES_RAW
 Field          | Type
--------------------------------------------
 ROWTIME        | BIGINT           (system)
 ROWKEY         | VARCHAR(STRING)  (system)
 USERID         | INTEGER          (key)
 GAMENAME       | VARCHAR(STRING)
 AMOUNT         | INTEGER
 STAMP          | BIGINT
 HOUR_DIMENSION | VARCHAR(STRING)
 DAY_DIMENSION  | VARCHAR(STRING)
--------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>

Я собираюсь создать совокупность из этой группы потоков GAME_PURCHASES_RAW по USERID. Я думал, что для агрегатов ключ сообщения kafka не может быть нулевым, потому что мне нужно хранить сообщения для каждого уникального USERID в одном и том же разделе.

Почему поток для GAME_PURCHASES_RAW не показывает ключ в сообщении kafka для topi c он создает?

Правильно ли я конфигурирую сообщение и k sql ключей, чтобы убедиться в правильности моего окончательного агрегирования?

(Я подозреваю, что мое понимание ключей kafka vs k sql на каком-то фундаментальном уровне не хватает ключей потока)

...