K SQL: ключевой столбец таблицы не является столбцом, используемым в критериях объединения - PullRequest
0 голосов
/ 18 февраля 2020

Я хотел бы объединить две таблицы K SQL с отношением один ко многим в столбце, который отличается от ключа.
Как я могу это сделать?
Я приведу пример, контекст, решение, над которым я начал работать, и проблемы, с которыми я столкнулся.

Пример

В одной команде может быть много игроков, и соединение должно быть на teams.name = players.team:

teams:

| name     | founded | arena          | championships |
|----------|---------|----------------|---------------|
| eagles   | 2020    | at&t arena     | 0             |
| vultures | 2019    | verizon center | 1             |

players:

| name     | team     | avg_points | born |
|----------|----------|------------|------|
| alice    | vultures | 30         | 2013 |
| bob      | vultures | 25         | 2015 |
| charlie  | eagles   | 20         | 2014 |

Желаемый результат - players_teams_enriched:

| player   | team     | avg_points | born | team_founded | arena          | team_championships |
|----------|----------|------------|------|--------------|----------------|--------------------|
| alice    | vultures | 30         | 2013 | 2019         | verizon center | 1                  |
| bob      | vultures | 25         | 2015 | 2019         | verizon center | 1                  |
| charlie  | eagles   | 20         | 2014 | 2020         | at&t arena     | 0                  |

Context

Настройка инфраструктуры

В следующем предполагается, что у вас локальная платформа 5.4.0 и mysql db. Вкратце:

# Run confluent platform in docker:
git clone git@github.com:confluentinc/examples
# add mysql driver and connector before starting docker compose, then:
cd examples/cp-all-in-one && docker-compose up -d
# connect as a client:
docker run -it --rm --name ksql-cli-1 --network cp-all-in-one_default confluentinc/cp-ksql-cli:5.4.0 http://ksql-server:8088

# Run mysql in docker:
# create the docker network if it doesn't exist yet
docker network inspect cp-all-in-one_default &>/dev/null || docker network create --driver bridge cp-all-in-one_default
# create the container
docker run --name mysql --network cp-all-in-one_default -p 3306:3306 -e MYSQL_ROOT_PASSWORD=pass -e MYSQL_DATABASE=example -e MYSQL_USER=demouser -e MYSQL_PASSWORD=demopass -d mysql
# connect as a client
docker run -it --network cp-all-in-one_default --name mysql-client --rm mysql mysql -hmysql -udemouser -pdemopass example

Воспроизведение

Таблица teams и players уже существует в моей реляционной базе данных (например, mysql):

CREATE TABLE teams(
    name VARCHAR(128) NOT NULL PRIMARY KEY,
    founded INT NOT NULL,
    arena VARCHAR(128) NOT NULL,
    championships INT NOT NULL,
    lastupdated TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP
);

CREATE TABLE players(
    name VARCHAR(128) NOT NULL PRIMARY KEY,
    team VARCHAR(128) NOT NULL,
    avg_points DOUBLE NOT NULL,
    born INT NOT NULL,
    lastupdated TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP
);

INSERT INTO teams   (name, founded, arena, championships) VALUES ('eagles',   2020, 'at&t arena',     0);
INSERT INTO teams   (name, founded, arena, championships) VALUES ('vultures', 2019, 'verizon center', 1);
INSERT INTO players (name, team, avg_points, born)        VALUES ('alice', 'vultures', 30, 2013);
INSERT INTO players (name, team, avg_points, born)        VALUES ('bob', 'vultures', 25, 2015);
INSERT INTO players (name, team, avg_points, born)        VALUES ('charlie', 'eagles', 20, 2014);

Тогда Я создаю два соединителя с помощью K SQL CLI и создаю две таблицы K SQL (все команды теперь будут в K SQL CLI):

SET 'auto.offset.reset'='earliest';

CREATE SOURCE CONNECTOR teams WITH(
    "connector.class"='io.confluent.connect.jdbc.JdbcSourceConnector',
    "connection.url"='jdbc:mysql://mysql:3306/example?user=demouser&password=demopass',
    "mode"='timestamp',
    "table.whitelist"='teams',
    "timestamp.column.name"='lastupdated',
    "topic.prefix"='jdbc_',
    "transforms"='valueToKey,extractFieldFromKey',
    "transforms.valueToKey.type"='org.apache.kafka.connect.transforms.ValueToKey',
    "transforms.valueToKey.fields"='name',
    "transforms.extractFieldFromKey.type"='org.apache.kafka.connect.transforms.ExtractField$Key',
    "transforms.extractFieldFromKey.field"='name',
    "value.converter"='org.apache.kafka.connect.json.JsonConverter',
    "key.converter.schemas.enable"='false',
    "value.converter.schemas.enable"='false'
);
-- PRINT jdbc_teams FROM BEGINNING;

CREATE SOURCE CONNECTOR players WITH(
    "connector.class"='io.confluent.connect.jdbc.JdbcSourceConnector',
    "connection.url"='jdbc:mysql://mysql:3306/example?user=demouser&password=demopass',
    "mode"='timestamp',
    "table.whitelist"='players',
    "timestamp.column.name"='lastupdated',
    "topic.prefix"='jdbc_',
    "transforms"='valueToKey,extractFieldFromKey',
    "transforms.valueToKey.type"='org.apache.kafka.connect.transforms.ValueToKey',
    "transforms.valueToKey.fields"='name',
    "transforms.extractFieldFromKey.type"='org.apache.kafka.connect.transforms.ExtractField$Key',
    "transforms.extractFieldFromKey.field"='name',
    "value.converter"='org.apache.kafka.connect.json.JsonConverter',
    "key.converter.schemas.enable"='false',
    "value.converter.schemas.enable"='false'
);
-- PRINT jdbc_players FROM BEGINNING;

CREATE TABLE teams(
    name VARCHAR,
    founded INT,
    arena VARCHAR,
    championships INT,
    lastupdated BIGINT
) WITH (
    KAFKA_TOPIC = 'jdbc_teams',
    VALUE_FORMAT='JSON',
    TIMESTAMP='lastupdated',
    KEY = 'name'
);
-- SELECT * FROM teams EMIT CHANGES;

CREATE TABLE players(
    name VARCHAR,
    team VARCHAR,
    avg_points DOUBLE,
    born INT,
    lastupdated BIGINT
) WITH (
    KAFKA_TOPIC = 'jdbc_players',
    VALUE_FORMAT='JSON',
    TIMESTAMP='lastupdated',
    KEY = 'name'
);
-- SELECT * FROM players EMIT CHANGES;

Теперь я хотел бы присоединиться к столам В K SQL CLI:

SELECT
    players.name as player,
    teams.name as team,
    avg_points,
    born,
    teams.founded as team_founded,
    arena,
    teams.championships as team_championships
FROM
    teams INNER JOIN players ON teams.name = players.team
EMIT CHANGES;

Это приводит к ошибке:

Ключевой столбец таблицы источника (PLAYERS) (PLAYERS.NAME) не является столбцом, используемым в критерии присоединения (PLAYERS.TEAM). В критериях объединения поддерживается только ключевой столбец таблицы или «ROWKEY».

Возможное решение: collect_list ()

Одним из решений может быть «массаж» таблицы игроков и ключа это имя команды вместо имени игрока, используя collect_list(COLUMN) и затем (после объединения с таблицей команд) EXPLODE(COLUMN) AS NEW_NAME:

SELECT
    team,
    collect_list(name) as players,
    collect_list(avg_points) as avg_points,
    collect_list(born) as born_dates
FROM players
GROUP BY team
EMIT CHANGES;

Возвращает хорошие результаты:

+---------+-------------+-------------+-------------+
|TEAM     |PLAYERS      |AVG_POINTS   |BORN_DATES   |
+---------+-------------+-------------+-------------+
|vultures |[alice]      |[30.0]       |[2013]       |
|vultures |[alice, bob] |[30.0, 25.0] |[2013, 2015] |
|eagles   |[charlie]    |[20.0]       |[2014]       |

Таким образом, мы можем создать таблицу из запроса выше, а затем объединить команды:

CREATE TABLE teamplayers AS
    SELECT
        team,
        collect_list(name) as players,
        collect_list(avg_points) as avg_points,
        collect_list(born) as born_dates
    FROM players
    GROUP BY team;

CREATE TABLE enriched_team_players AS
    SELECT
        team,
        players,
        avg_points,
        born_dates,
        founded,
        arena,
        championships
    FROM teamplayers INNER JOIN teams ON teamplayers.team = teams.name;

SELECT * FROM enriched_team_players EMIT CHANGES;

Результат действительно близок к желаемому результату, и все, что осталось сделать (если я что-то упустил) это взорвать списки. Вот результат:

+--------------+---------+---------+-------------+-------------+-------------+--------+---------------+--------------+
|ROWTIME       |ROWKEY   |TEAM     |PLAYERS      |AVG_POINTS   |BORN_DATES   |FOUNDED |ARENA          |CHAMPIONSHIPS |
+--------------+---------+---------+-------------+-------------+-------------+--------+---------------+--------------+
|1582017691000 |vultures |vultures |[alice, bob] |[30.0, 25.0] |[2013, 2015] |2019    |verizon center |1             |
|1582017691000 |eagles   |eagles   |[charlie]    |[20.0]       |[2014]       |2020    |at&t arena     |0             |

Теперь я пытаюсь разобрать результаты с помощью следующего запроса:

CREATE TABLE desired_result AS
    SELECT
        EXPLODE(players) as player,
        team,
        EXPLODE(avg_points) as avg_points,
        EXPLODE(born_dates) as born,
        founded as team_founded,
        arena,
        championships as team_championships
    FROM enriched_team_players;

, и я получаю это в выводе:

источник

Как действовать?

Где напечатано это source?

Когда я list tables; не вижу ни одной таблицы с именем desired_result, а когда я list queries;, я не вижу нового запроса, который создает эту таблицу.
Почему бы мне не получить результаты для EXPLODE, как описано в табличных функциях ?

Есть ли другой (возможно, более простой) способ получить желаемый результат, учитывая схему, предоставленную мне в реляционной БД (который я не контролирую)?
Текущий способ использует collect_list , который (для версии 5.4.0 из k sql) поддерживает до 1000 записей в строке.
In В приведенном выше примере ни одна команда не содержит более 1000 игроков, однако в моем реальном случае использования в ~ 5% случаев некоторые строки связаны примерно с 10 000 объектов.
Можно ли настроить предел в 1000?
Есть ли решение, которое не имеет этого ограничения?

1 Ответ

0 голосов
/ 10 марта 2020

похоже, что вы столкнулись с этой ошибкой в ​​бета-версии табличных функций:

https://github.com/confluentinc/ksql/issues/4033

Это исправлено в более поздних версиях K SQL.

Также, на более общей ноте к вашей исходной проблеме ...

В настоящий момент вы смоделировали две исходные таблицы как таблицы в K SQL. Таблицы в K SQL поддерживают только соединения по их первичному ключу. Следовательно, вы сталкиваетесь с тем, что «Только ключевой столбец таблицы или« ROWKEY »поддерживаются в критериях соединения». ошибка.

Рассматривали ли вы моделирование jdbc_players topi c как поток изменений? В отличие от таблиц, поток можно перераспределить, чтобы разрешить требуемое объединение. Это может дать вам то, что вы хотите с версией K SQL, которую вы используете. Недостатком этого подхода является то, что он может некорректно обрабатывать удаления из исходных таблиц.

-- stream of player changes:
CREATE STREAM player_changes(
    name VARCHAR,
    team VARCHAR,
    avg_points DOUBLE,
    born INT,
    lastupdated BIGINT
) WITH (
    KAFKA_TOPIC = 'jdbc_players',
    VALUE_FORMAT='JSON',
    TIMESTAMP='lastupdated'
);

-- join:
SELECT * FROM player_changes p JOIN teams t on p.team = t.name;
...