Я исследую некоторые вещи для работы, где мы должны доказать ценность Kafka и KSQL как части POC. Мы работаем над простыми игровыми событиями, когда игрок может начать игру, а игровой движок либо назначает им очки (считается победой), либо они ничего не получают (считаются проигрышем). Одно из наших требований - доказать выполнимость KSQL при последовательных проигрышах или выигрышах. Для целей этого вопроса я сосредоточусь на последовательных проигрышах.
У нас есть две таблицы: одна для побед и одна для потерь, которые идут против одного и того же потока игровых событий.
DDL выигрышей выглядит следующим образом:
CREATE TABLE TABLE__GAME_EVENTS__WINS AS
SELECT userId,
gameName,
COUNT(*) AS totalWins
FROM STREAM__GAME_EVENTS
WINDOW TUMBLING (SIZE 30 MINUTES)
WHERE pointsAllocated > 0
GROUP BY userId,
gameName;
DDL потерь выглядит следующим образом:
CREATE TABLE TABLE__GAME_EVENTS__LOSSES AS
SELECT userId,
gameName,
COUNT(*) AS totalLosses
FROM STREAM__GAME_EVENTS
WINDOW TUMBLING (SIZE 30 MINUTES)
WHERE pointsAllocated = 0
GROUP BY userId,
gameName;
Для достижения последовательных потерь в течение 30-минутного периода окнанаш текущий подход заключается в том, чтобы JOIN
таблица WINS
на таблицу LOSSES
и запускать только тогда, когда в таблице WINS
ничего нет и что-то в таблице LOSSES
.
Выполнениечто у нас есть следующий код:
SELECT losses.userId,
losses.gameName,
losses.totalLosses
FROM TABLE__GAME_EVENTS__LOSSES losses
JOIN TABLE__GAME_EVENTS__WINS wins
ON wins.userId = losses.userId
AND wins.gameName = losses.gameName
WHERE losses.totalLosses >= 1
AND wins.totalWins = 0;
Когда я пытаюсь выполнить этот код, я получаю следующую ошибку: io.confluent.ksql.parser.tree.LogicalBinaryExpression cannot be cast to io.confluent.ksql.parser.tree.ComparisonExpression
Первоначально я думал, что это было связано с WHERE
предложение, но после рассмотрения ответа Ходжата на аналогичный вопрос выясняется, что проблема связана с оператором AND
в JOIN
.
Когда я запускаю DESCRIBE EXTENDED
для обеих таблиц, я получаю следующее (одинаковое для обоих):
ksql> DESCRIBE EXTENDED TABLE__GAME_EVENTS__WINS;
Name : TABLE__GAME_EVENTS__WINS
Type : TABLE
Key field : KSQL_INTERNAL_COL_0|+|KSQL_INTERNAL_COL_1
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : JSON
Kafka topic : TABLE__GAME_EVENTS__WINS (partitions: 4, replication: 1)
Field | Type
---------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
USERID | INTEGER
GAMENAME | VARCHAR(STRING)
TOTALWINS | BIGINT
---------------------------------------
Учитывая, что COL_0
- userId
и COL_1
- gameName
являются ключевыми полями в обеих таблицах, кажется логичным, что они имеют для включения в JOIN
. Можно ли обойти это или в настоящее время невозможно JOIN
для нескольких столбцов?