Дублированные ключи в таблице k sql - PullRequest
0 голосов
/ 12 марта 2020

Я пытаюсь создать таблицу sql, чтобы сохранить последнюю версию сущности. Я хотел бы знать, каков рекомендуемый подход для этого.

До сих пор я пробовал следующее: я загружал счета в kafka через debezium, а затем создал следующий поток, чтобы иметь возможность использовать их из k sql:

ksql> create stream invoice_stream with (kafka_topic='dbserver1.invoices.invoice', value_format='AVRO');

Дебезиум добавляет данные о состоянии строки таблицы базы данных до и после, что делает его немного сложным в использовании, поэтому я создал еще один поток сверху, чтобы получать только те данные, которые у меня есть. интересует:

create stream invoice 
      with (kafka_topic='invoice', value_format='AVRO')
      as
      select i.before->id as before_id,
             i.after->id as after_id,
             ifnull(i.transaction->id, 'NA') as transaction_id,
             i.after->description as description,
             i.after->invoice_date as invoice_date,
             i.after->status as status
      from invoice_stream i;

Пока все хорошо, я могу запросить поток с помощью запроса pu sh и посмотреть, что ожидается:

ksql> select * from invoice emit changes;
+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|ROWTIME                 |ROWKEY                  |BEFORE_ID               |AFTER_ID                |TRANSACTION_ID          |DESCRIPTION             |INVOICE_DATE            |STATUS                  |
+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|1583961059498           |                        |null                    |1                       |NA                      |Invoice A               |18201                   |N                       |
|1583961059499           |                        |null                    |2                       |NA                      |Invoice B               |18205                   |N                       |
|1583961059499           |                        |null                    |3                       |NA                      |Invoice C               |18210                   |N                       |
|1583961059499           |                        |null                    |4                       |NA                      |Invoice D               |18215                   |N                       |
|1583961263233           |                        |null                    |5                       |623                     |test line added later   |18263                   |N                       |
|1584007291546           |                        |5                       |5                       |625                     |test line added later   |18263                   |P                       |

Поскольку ключа нет, Я создал еще один поток сверху, где я определяю раздел:

ksql> create stream invoice_rekeyed as select * from invoice partition by after_id;
ksql> describe invoice_rekeyed;

Name                 : INVOICE_REKEYED
 Field          | Type                      
--------------------------------------------
 ROWTIME        | BIGINT           (system) 
 ROWKEY         | VARCHAR(STRING)  (system) 
 BEFORE_ID      | INTEGER                   
 AFTER_ID       | INTEGER          (key)    
 TRANSACTION_ID | VARCHAR(STRING)           
 DESCRIPTION    | VARCHAR(STRING)           
 INVOICE_DATE   | INTEGER                   
 STATUS         | VARCHAR(STRING)           
--------------------------------------------

Наконец, я создал таблицу, подобную этой:

create table invoice_table(before_id int, after_id int, transaction_id string, description string, invoice_date int, status string) 
with (kafka_topic='INVOICE_REKEYED', key='after_id', value_format='AVRO');

Так что в этот момент я ожидал возможности запроса таблица за строкой, однако я получаю следующее сообщение:

ksql> select * from invoice_table where rowkey = 5;
Table 'INVOICE_TABLE' is not materialized. Refer to https://cnfl.io/queries for info on query types. If you intended to issue a push query, resubmit with the EMIT CHANGES clause
 KSQL currently only supports pull queries on materialized aggregate tables. i.e. those created by a 'CREATE TABLE AS SELECT <fields>, <aggregate_functions> FROM <sources> GROUP BY <key>' style statement.
Query syntax in KSQL has changed. There are now two broad categories of queries:
- Pull queries: query the current state of the system, return a result, and terminate. 
- Push queries: query the state of the system in motion and continue to output results until they meet a LIMIT condition or are terminated by the user.

'EMIT CHANGES' is used to to indicate a query is a push query. To convert a pull query into a push query, which was the default behavior in older versions of KSQL, add `EMIT CHANGES` to the end of the statement 
before any LIMIT clause.

For example, the following are pull queries:
    'SELECT * FROM X WHERE ROWKEY=Y;' (non-windowed table)
    'SELECT * FROM X WHERE ROWKEY=Y AND WINDOWSTART>=Z;' (windowed table)

The following is a push query:
    'SELECT * FROM X EMIT CHANGES;'

Note: Persistent queries, e.g. `CREATE TABLE AS ...`, have an implicit `EMIT CHANGES`, but we recommend adding `EMIT CHANGES` to these statements.

Кроме того, если я запрашиваю его как запрос pu sh, я вижу более одной строки для ключа 5:

ksql> select * from invoice_table emit changes;
+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|ROWTIME                 |ROWKEY                  |BEFORE_ID               |AFTER_ID                |TRANSACTION_ID          |DESCRIPTION             |INVOICE_DATE            |STATUS                  |
+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|1583961059498           |1                       |null                    |1                       |NA                      |Invoice A               |18201                   |N                       |
|1583961059499           |2                       |null                    |2                       |NA                      |Invoice B               |18205                   |N                       |
|1583961059499           |3                       |null                    |3                       |NA                      |Invoice C               |18210                   |N                       |
|1583961059499           |4                       |null                    |4                       |NA                      |Invoice D               |18215                   |N                       |
|1583961263233           |5                       |null                    |5                       |623                     |test line added later   |18263                   |N                       |
|1584007291546           |5                       |5                       |5                       |625                     |test line added later   |18263                   |P                       |

Хотелось бы понять, почему стол не материализовался, так как согласно предыдущему сообщению мне кажется, что я не могу начать запрашивать таблицу по ключу строки.

Заранее спасибо

UPDATE

Пытаясь на примере, указанном Робином, я действительно получил ожидаемое поведение; в этом примере при выполнении запроса при обновлении исходной строки БД появляются изменения:

ksql> select * from customers where id = 5 emit changes;
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|ROWTIME          |ROWKEY           |ID               |FIRST_NAME       |LAST_NAME        |EMAIL            |GENDER           |CLUB_STATUS      |COMMENTS         |CREATE_TS        |UPDATE_TS        |
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|1584102664415    |5                |5                |Hansiain         |Coda             |hcoda4@senate.gov|Male             |platinum         |Centralized full-|2020-03-13T12:29:|2020-03-13T12:29:|
|                 |                 |                 |                 |                 |                 |                 |                 |range approach   |53Z              |53Z              |
|1584102741712    |5                |5                |Rodrigo          |Coda             |hcoda4@senate.gov|Male             |platinum         |Centralized full-|2020-03-13T12:29:|2020-03-13T12:32:|
|                 |                 |                 |                 |                 |                 |                 |                 |range approach   |53Z              |21Z              |

Однако, если запрос завершается и выполняется снова, доступна только последняя версия:

ksql> select * from customers where id = 5 emit changes;
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|ROWTIME          |ROWKEY           |ID               |FIRST_NAME       |LAST_NAME        |EMAIL            |GENDER           |CLUB_STATUS      |COMMENTS         |CREATE_TS        |UPDATE_TS        |
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|1584102741712    |5                |5                |Rodrigo          |Coda             |hcoda4@senate.gov|Male             |platinum         |Centralized full-|2020-03-13T12:29:|2020-03-13T12:32:|
|                 |                 |                 |                 |                 |                 |                 |                 |range approach   |53Z              |21Z              |

Однако, делая в принципе то же самое в моем примере, всегда возвращайте все версии строки:

ksql> print 'dbserver1.invoices.invoice' from beginning limit 50;
Format:AVRO
3/13/20 12:23:09 PM UTC, 1, {"id": 1, "description": "Invoice A", "invoice_date": 18201, "status": "N", "__op": "r", "__ts_ms": 1584102188934, "__transaction_id": null}
3/13/20 12:23:09 PM UTC, 2, {"id": 2, "description": "Invoice B", "invoice_date": 18205, "status": "N", "__op": "r", "__ts_ms": 1584102188936, "__transaction_id": null}
3/13/20 12:23:09 PM UTC, 3, {"id": 3, "description": "Invoice C", "invoice_date": 18210, "status": "N", "__op": "r", "__ts_ms": 1584102188938, "__transaction_id": null}
3/13/20 12:23:09 PM UTC, 4, {"id": 4, "description": "Invoice D", "invoice_date": 18215, "status": "N", "__op": "r", "__ts_ms": 1584102188938, "__transaction_id": null}
^CTopic printing ceased
ksql> create table invoice_table with (kafka_topic='dbserver1.invoices.invoice', value_format='AVRO');

 Message       
---------------
 Table created 
---------------

ksql> select * from invoice_table where id = 4 emit changes;
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|ROWTIME              |ROWKEY               |ID                   |DESCRIPTION          |INVOICE_DATE         |STATUS               |__OP                 |__TS_MS              |__TRANSACTION_ID     |
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|1584102189675        |4                    |4                    |Invoice D            |18215                |N                    |r                    |1584102188938        |null                 |
|1584102365378        |4                    |4                    |Invoice D UPDATED    |18215                |N                    |u                    |1584102365128        |623                  |
^CQuery terminated
ksql> select * from invoice_table where id = 4 emit changes;
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|ROWTIME              |ROWKEY               |ID                   |DESCRIPTION          |INVOICE_DATE         |STATUS               |__OP                 |__TS_MS              |__TRANSACTION_ID     |
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|1584102189675        |4                    |4                    |Invoice D            |18215                |N                    |r                    |1584102188938        |null                 |
|1584102365378        |4                    |4                    |Invoice D UPDATED    |18215                |N                    |u                    |1584102365128        |623                  |

Любая идея о любой конфигурации, которая может изменить эту ситуацию в поведении?

1 Ответ

1 голос
/ 12 марта 2020

Несколько вещей, которые можно отменить и помочь вам в этом, но ответ верхнего уровня: Запросы извлечения не поддерживаются для нематериализованных таблиц , и вы не материализовались, и вы не можете до # 3985 доставлено.

Как вы видели, вы можете выполнить запрос pu sh к таблице. Несколько выходов, которые вы видите, изменяются до состояния. Если вы отмените свой запрос pu sh и повторно запустите его, вы увидите только одно состояние для каждого ключа.


Дебезиум добавляет данные о состоянии строки таблицы базы данных до и после того, что делает его немного сложным в использовании

Проверьте io.debezium.transforms.ExtractNewRecordState Single Преобразование сообщений, которое сгладит полезную нагрузку и поместит в ваше сообщение только текущее состояние

'transforms'= 'unwrap',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',

Поскольку ключа нет, я создал еще один поток сверху, где я указываю раздел:

Это один из подходов, но лучше установить ключ как часть захвата Kafka Connect

'transforms'= 'extractkey',
'transforms.extractkey.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key',
'transforms.extractkey.field'= 'id',
'key.converter'= 'org.apache.kafka.connect.storage.StringConverter',

. out этот недавний семинар QCon .

...