Я хотел бы объединить две таблицы K SQL с отношением один ко многим в столбце, который отличается от ключа.
Как я могу это сделать?
Я приведу пример, контекст, решение, над которым я начал работать, и проблемы, с которыми я столкнулся.
Пример
В одной команде может быть много игроков, и соединение должно быть на teams.name = players.team
:
teams
:
| name | founded | arena | championships |
|----------|---------|----------------|---------------|
| eagles | 2020 | at&t arena | 0 |
| vultures | 2019 | verizon center | 1 |
players
:
| name | team | avg_points | born |
|----------|----------|------------|------|
| alice | vultures | 30 | 2013 |
| bob | vultures | 25 | 2015 |
| charlie | eagles | 20 | 2014 |
Желаемый результат - players_teams_enriched
:
| player | team | avg_points | born | team_founded | arena | team_championships |
|----------|----------|------------|------|--------------|----------------|--------------------|
| alice | vultures | 30 | 2013 | 2019 | verizon center | 1 |
| bob | vultures | 25 | 2015 | 2019 | verizon center | 1 |
| charlie | eagles | 20 | 2014 | 2020 | at&t arena | 0 |
Context
Настройка инфраструктуры
В следующем предполагается, что у вас локальная платформа 5.4.0 и mysql db. Вкратце:
# Run confluent platform in docker:
git clone git@github.com:confluentinc/examples
# add mysql driver and connector before starting docker compose, then:
cd examples/cp-all-in-one && docker-compose up -d
# connect as a client:
docker run -it --rm --name ksql-cli-1 --network cp-all-in-one_default confluentinc/cp-ksql-cli:5.4.0 http://ksql-server:8088
# Run mysql in docker:
# create the docker network if it doesn't exist yet
docker network inspect cp-all-in-one_default &>/dev/null || docker network create --driver bridge cp-all-in-one_default
# create the container
docker run --name mysql --network cp-all-in-one_default -p 3306:3306 -e MYSQL_ROOT_PASSWORD=pass -e MYSQL_DATABASE=example -e MYSQL_USER=demouser -e MYSQL_PASSWORD=demopass -d mysql
# connect as a client
docker run -it --network cp-all-in-one_default --name mysql-client --rm mysql mysql -hmysql -udemouser -pdemopass example
Воспроизведение
Таблица teams
и players
уже существует в моей реляционной базе данных (например, mysql):
CREATE TABLE teams(
name VARCHAR(128) NOT NULL PRIMARY KEY,
founded INT NOT NULL,
arena VARCHAR(128) NOT NULL,
championships INT NOT NULL,
lastupdated TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP
);
CREATE TABLE players(
name VARCHAR(128) NOT NULL PRIMARY KEY,
team VARCHAR(128) NOT NULL,
avg_points DOUBLE NOT NULL,
born INT NOT NULL,
lastupdated TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP
);
INSERT INTO teams (name, founded, arena, championships) VALUES ('eagles', 2020, 'at&t arena', 0);
INSERT INTO teams (name, founded, arena, championships) VALUES ('vultures', 2019, 'verizon center', 1);
INSERT INTO players (name, team, avg_points, born) VALUES ('alice', 'vultures', 30, 2013);
INSERT INTO players (name, team, avg_points, born) VALUES ('bob', 'vultures', 25, 2015);
INSERT INTO players (name, team, avg_points, born) VALUES ('charlie', 'eagles', 20, 2014);
Тогда Я создаю два соединителя с помощью K SQL CLI и создаю две таблицы K SQL (все команды теперь будут в K SQL CLI):
SET 'auto.offset.reset'='earliest';
CREATE SOURCE CONNECTOR teams WITH(
"connector.class"='io.confluent.connect.jdbc.JdbcSourceConnector',
"connection.url"='jdbc:mysql://mysql:3306/example?user=demouser&password=demopass',
"mode"='timestamp',
"table.whitelist"='teams',
"timestamp.column.name"='lastupdated',
"topic.prefix"='jdbc_',
"transforms"='valueToKey,extractFieldFromKey',
"transforms.valueToKey.type"='org.apache.kafka.connect.transforms.ValueToKey',
"transforms.valueToKey.fields"='name',
"transforms.extractFieldFromKey.type"='org.apache.kafka.connect.transforms.ExtractField$Key',
"transforms.extractFieldFromKey.field"='name',
"value.converter"='org.apache.kafka.connect.json.JsonConverter',
"key.converter.schemas.enable"='false',
"value.converter.schemas.enable"='false'
);
-- PRINT jdbc_teams FROM BEGINNING;
CREATE SOURCE CONNECTOR players WITH(
"connector.class"='io.confluent.connect.jdbc.JdbcSourceConnector',
"connection.url"='jdbc:mysql://mysql:3306/example?user=demouser&password=demopass',
"mode"='timestamp',
"table.whitelist"='players',
"timestamp.column.name"='lastupdated',
"topic.prefix"='jdbc_',
"transforms"='valueToKey,extractFieldFromKey',
"transforms.valueToKey.type"='org.apache.kafka.connect.transforms.ValueToKey',
"transforms.valueToKey.fields"='name',
"transforms.extractFieldFromKey.type"='org.apache.kafka.connect.transforms.ExtractField$Key',
"transforms.extractFieldFromKey.field"='name',
"value.converter"='org.apache.kafka.connect.json.JsonConverter',
"key.converter.schemas.enable"='false',
"value.converter.schemas.enable"='false'
);
-- PRINT jdbc_players FROM BEGINNING;
CREATE TABLE teams(
name VARCHAR,
founded INT,
arena VARCHAR,
championships INT,
lastupdated BIGINT
) WITH (
KAFKA_TOPIC = 'jdbc_teams',
VALUE_FORMAT='JSON',
TIMESTAMP='lastupdated',
KEY = 'name'
);
-- SELECT * FROM teams EMIT CHANGES;
CREATE TABLE players(
name VARCHAR,
team VARCHAR,
avg_points DOUBLE,
born INT,
lastupdated BIGINT
) WITH (
KAFKA_TOPIC = 'jdbc_players',
VALUE_FORMAT='JSON',
TIMESTAMP='lastupdated',
KEY = 'name'
);
-- SELECT * FROM players EMIT CHANGES;
Теперь я хотел бы присоединиться к столам В K SQL CLI:
SELECT
players.name as player,
teams.name as team,
avg_points,
born,
teams.founded as team_founded,
arena,
teams.championships as team_championships
FROM
teams INNER JOIN players ON teams.name = players.team
EMIT CHANGES;
Это приводит к ошибке:
Ключевой столбец таблицы источника (PLAYERS) (PLAYERS.NAME) не является столбцом, используемым в критерии присоединения (PLAYERS.TEAM). В критериях объединения поддерживается только ключевой столбец таблицы или «ROWKEY».
Возможное решение: collect_list ()
Одним из решений может быть «массаж» таблицы игроков и ключа это имя команды вместо имени игрока, используя collect_list(COLUMN)
и затем (после объединения с таблицей команд) EXPLODE(COLUMN) AS NEW_NAME
:
SELECT
team,
collect_list(name) as players,
collect_list(avg_points) as avg_points,
collect_list(born) as born_dates
FROM players
GROUP BY team
EMIT CHANGES;
Возвращает хорошие результаты:
+---------+-------------+-------------+-------------+
|TEAM |PLAYERS |AVG_POINTS |BORN_DATES |
+---------+-------------+-------------+-------------+
|vultures |[alice] |[30.0] |[2013] |
|vultures |[alice, bob] |[30.0, 25.0] |[2013, 2015] |
|eagles |[charlie] |[20.0] |[2014] |
Таким образом, мы можем создать таблицу из запроса выше, а затем объединить команды:
CREATE TABLE teamplayers AS
SELECT
team,
collect_list(name) as players,
collect_list(avg_points) as avg_points,
collect_list(born) as born_dates
FROM players
GROUP BY team;
CREATE TABLE enriched_team_players AS
SELECT
team,
players,
avg_points,
born_dates,
founded,
arena,
championships
FROM teamplayers INNER JOIN teams ON teamplayers.team = teams.name;
SELECT * FROM enriched_team_players EMIT CHANGES;
Результат действительно близок к желаемому результату, и все, что осталось сделать (если я что-то упустил) это взорвать списки. Вот результат:
+--------------+---------+---------+-------------+-------------+-------------+--------+---------------+--------------+
|ROWTIME |ROWKEY |TEAM |PLAYERS |AVG_POINTS |BORN_DATES |FOUNDED |ARENA |CHAMPIONSHIPS |
+--------------+---------+---------+-------------+-------------+-------------+--------+---------------+--------------+
|1582017691000 |vultures |vultures |[alice, bob] |[30.0, 25.0] |[2013, 2015] |2019 |verizon center |1 |
|1582017691000 |eagles |eagles |[charlie] |[20.0] |[2014] |2020 |at&t arena |0 |
Теперь я пытаюсь разобрать результаты с помощью следующего запроса:
CREATE TABLE desired_result AS
SELECT
EXPLODE(players) as player,
team,
EXPLODE(avg_points) as avg_points,
EXPLODE(born_dates) as born,
founded as team_founded,
arena,
championships as team_championships
FROM enriched_team_players;
, и я получаю это в выводе:
источник
Как действовать?
Где напечатано это source
?
Когда я list tables;
не вижу ни одной таблицы с именем desired_result
, а когда я list queries;
, я не вижу нового запроса, который создает эту таблицу.
Почему бы мне не получить результаты для EXPLODE
, как описано в табличных функциях ?
Есть ли другой (возможно, более простой) способ получить желаемый результат, учитывая схему, предоставленную мне в реляционной БД (который я не контролирую)?
Текущий способ использует collect_list , который (для версии 5.4.0 из k sql) поддерживает до 1000 записей в строке.
In В приведенном выше примере ни одна команда не содержит более 1000 игроков, однако в моем реальном случае использования в ~ 5% случаев некоторые строки связаны примерно с 10 000 объектов.
Можно ли настроить предел в 1000?
Есть ли решение, которое не имеет этого ограничения?