Confluent 4.1.0 -> KSQL: соединение STREAM-TABLE -> данные таблицы null - PullRequest
0 голосов
/ 30 апреля 2018

ШАГ 1: Запустите производителя для создания образца данных

./bin/kafka-avro-console-producer \
         --broker-list localhost:9092 --topic stream-test-topic \
         --property schema.registry.url=http://localhost:8081 \
         --property value.schema='{"type":"record","name":"dealRecord","fields":[{"name":"DEAL_ID","type":"string"},{"name":"DEAL_EXPENSE_CODE","type":"string"},{"name":"DEAL_BRANCH","type":"string"}]}'

Пример данных:

{"DEAL_ID":"deal002", "DEAL_EXPENSE_CODE":"EXP002", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal003", "DEAL_EXPENSE_CODE":"EXP003", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal004", "DEAL_EXPENSE_CODE":"EXP004", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal005", "DEAL_EXPENSE_CODE":"EXP005", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal006", "DEAL_EXPENSE_CODE":"EXP006", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal007", "DEAL_EXPENSE_CODE":"EXP001", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal008", "DEAL_EXPENSE_CODE":"EXP002", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal009", "DEAL_EXPENSE_CODE":"EXP003", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal010", "DEAL_EXPENSE_CODE":"EXP004", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal011", "DEAL_EXPENSE_CODE":"EXP005", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal012", "DEAL_EXPENSE_CODE":"EXP006", "DEAL_BRANCH":"AMSTERDAM"}

ШАГ 2: Откройте другой терминал и запустите потребителя, чтобы проверить данные.

./bin/kafka-avro-console-consumer --topic stream-test-topic \
         --bootstrap-server localhost:9092 \
         --property schema.registry.url=http://localhost:8081 \
         --from-beginning

ШАГ 3: Откройте другой терминал и запустите производителя.

./bin/kafka-avro-console-producer \
         --broker-list localhost:9092 --topic expense-test-topic \
--property "parse.key=true" \
--property "key.separator=:" \
--property schema.registry.url=http://localhost:8081 \
--property key.schema='"string"' \
         --property value.schema='{"type":"record","name":"dealRecord","fields":[{"name":"EXPENSE_CODE","type":"string"},{"name":"EXPENSE_DESC","type":"string"}]}'

Данные:

"pk1":{"EXPENSE_CODE":"EXP001", "EXPENSE_DESC":"Regulatory Deposit"}
"pk2":{"EXPENSE_CODE":"EXP002", "EXPENSE_DESC":"ABC - Sofia"}
"pk3":{"EXPENSE_CODE":"EXP003", "EXPENSE_DESC":"Apple Corporation"}
"pk4":{"EXPENSE_CODE":"EXP004", "EXPENSE_DESC":"Confluent Europe"}
"pk5":{"EXPENSE_CODE":"EXP005", "EXPENSE_DESC":"Air India"}
"pk6":{"EXPENSE_CODE":"EXP006", "EXPENSE_DESC":"KLM International"}

ШАГ 4: Откройте другой терминал и запустите потребителя

./bin/kafka-avro-console-consumer --topic expense-test-topic \
         --bootstrap-server localhost:9092 \
--property "parse.key=true" \
--property "key.separator=:" \
--property schema.registry.url=http://localhost:8081 \
         --from-beginning

ШАГ 5: Войти в клиент KSQL.

./bin/ksql http://localhost:8088

создать следующий поток и таблицу и выполнить запрос на соединение.

KSQL:

STREAM:

    CREATE STREAM SAMPLE_STREAM 
       (DEAL_ID VARCHAR, DEAL_EXPENSE_CODE varchar, DEAL_BRANCH VARCHAR) 
       WITH (kafka_topic='stream-test-topic',value_format='AVRO', key = 'DEAL_ID');

Таблица:

CREATE TABLE SAMPLE_TABLE 
   (EXPENSE_CODE varchar, EXPENSE_DESC VARCHAR)
   WITH (kafka_topic='expense-test-topic',value_format='AVRO', key = 'EXPENSE_CODE');

СЛЕДУЕТ ВЫХОДУ:

ksql> SELECT STREAM1.DEAL_EXPENSE_CODE, TABLE1.EXPENSE_DESC 
       from SAMPLE_STREAM STREAM1 LEFT JOIN SAMPLE_TABLE TABLE1 
       ON STREAM1.DEAL_EXPENSE_CODE = TABLE1.EXPENSE_CODE  
       WINDOW TUMBLING (SIZE 3 MINUTE) 
       GROUP BY STREAM1.DEAL_EXPENSE_CODE, TABLE1.EXPENSE_DESC;

EXP001 | null
EXP001 | null
EXP002 | null
EXP003 | null
EXP004 | null
EXP005 | null
EXP006 | null
EXP002 | null
EXP002 | null

1 Ответ

0 голосов
/ 30 апреля 2018

tl; dr: данные вашей таблицы должны быть введены в столбец, к которому вы присоединяетесь.

Используя приведенные выше примеры данных, вы узнаете, как их устранить

  1. Используйте KSQL для проверки данных в темах (не нужно kafka-avro-console-consumer). Формат выходных данных: метка времени, ключ, значение

    • stream:

      ksql> print 'stream-test-topic' from beginning;
      Format:AVRO
      30/04/18 15:59:13 BST, null, {"DEAL_ID": "deal002", "DEAL_EXPENSE_CODE": "EXP002", "DEAL_BRANCH": "AMSTERDAM"}
      30/04/18 15:59:13 BST, null, {"DEAL_ID": "deal003", "DEAL_EXPENSE_CODE": "EXP003", "DEAL_BRANCH": "AMSTERDAM"}
      30/04/18 15:59:13 BST, null, {"DEAL_ID": "deal004", "DEAL_EXPENSE_CODE": "EXP004", "DEAL_BRANCH": "AMSTERDAM"}
      
    • table:

      ksql> print 'expense-test-topic' from beginning;
      Format:AVRO
      30/04/18 16:10:52 BST, pk1, {"EXPENSE_CODE": "EXP001", "EXPENSE_DESC": "Regulatory Deposit"}
      30/04/18 16:10:52 BST, pk2, {"EXPENSE_CODE": "EXP002", "EXPENSE_DESC": "ABC - Sofia"}
      30/04/18 16:10:52 BST, pk3, {"EXPENSE_CODE": "EXP003", "EXPENSE_DESC": "Apple Corporation"}
      30/04/18 16:10:52 BST, pk4, {"EXPENSE_CODE": "EXP004", "EXPENSE_DESC": "Confluent Europe"}
      30/04/18 16:10:52 BST, pk5, {"EXPENSE_CODE": "EXP005", "EXPENSE_DESC": "Air India"}
      30/04/18 16:10:52 BST, pk6, {"EXPENSE_CODE": "EXP006", "EXPENSE_DESC": "KLM International"}
      

    На этом этапе обратите внимание, что ключ (pk<x>) не соответствует столбцу, к которому мы будем присоединяться

  2. Зарегистрируйте две темы:

    ksql> CREATE STREAM deals WITH (KAFKA_TOPIC='stream-test-topic', VALUE_FORMAT='AVRO');
    
     Message
    ----------------
     Stream created
    ----------------
    
    ksql> CREATE TABLE expense_codes_table WITH (KAFKA_TOPIC='expense-test-topic', VALUE_FORMAT='AVRO', KEY='EXPENSE_CODE');
    
     Message
    ---------------
     Table created
    ---------------
    
  3. Скажите KSQL запрашивать события с начала каждой темы

    ksql> SET 'auto.offset.reset' = 'earliest';
    Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
    
  4. Проверка того, что объявленный ключ таблицы согласно DDL (KEY='EXPENSE_CODE') соответствует фактическому ключу базовых сообщений Kafka (доступно через системный столбец ROWKEY):

    ksql> SELECT ROWKEY, EXPENSE_CODE FROM expense_codes_table;
    pk1 | EXP001
    pk2 | EXP002
    pk3 | EXP003
    pk4 | EXP004
    pk5 | EXP005
    pk6 | EXP006
    

    Ключи не совпадают. Наше объединение обречено!

  5. Волшебный обходной путь - давайте перепишем тему, используя KSQL!

    • Зарегистрировать исходную тему таблицы как KSQL STREAM:

      ksql> CREATE STREAM expense_codes_stream WITH (KAFKA_TOPIC='expense-test-topic', VALUE_FORMAT='AVRO');
      
       Message
      ----------------
       Stream created
      ----------------
      
    • Создайте производный поток, набрав правильный столбец. Это подкреплено измененной темой Кафки.

      ksql> CREATE STREAM EXPENSE_CODES_REKEY AS SELECT * FROM expense_codes_stream PARTITION BY EXPENSE_CODE;
      
       Message
      ----------------------------
       Stream created and running
      ----------------------------
      
    • Перерегистрируйте KSQL _TABLE_ поверх новой темы:

      ksql> DROP TABLE expense_codes_table;
      
       Message
      ----------------------------------------
       Source EXPENSE_CODES_TABLE was dropped
      ----------------------------------------
      ksql> CREATE TABLE expense_codes_table WITH (KAFKA_TOPIC='EXPENSE_CODES_REKEY', VALUE_FORMAT='AVRO', KEY='EXPENSE_CODE');
      
       Message
      ---------------
       Table created
      ---------------
      
    • Проверьте соответствие ключей (объявленных и сообщенных) новой таблице:

      ksql> SELECT ROWKEY, EXPENSE_CODE FROM expense_codes_table;
      EXP005 | EXP005
      EXP001 | EXP001
      EXP002 | EXP002
      EXP003 | EXP003
      EXP006 | EXP006
      EXP004 | EXP004  
      
  6. Успешное присоединение:

    ksql> SELECT D.DEAL_EXPENSE_CODE, E.EXPENSE_DESC \
    FROM deals D \
      LEFT JOIN expense_codes_table E \
      ON D.DEAL_EXPENSE_CODE = E.EXPENSE_CODE  \
    WINDOW TUMBLING (SIZE 3 MINUTE) \
    GROUP BY D.DEAL_EXPENSE_CODE, E.EXPENSE_DESC;
    
    EXP006 | KLM International
    EXP003 | Apple Corporation
    EXP002 | ABC - Sofia
    EXP004 | Confluent Europe
    EXP001 | Regulatory Deposit
    EXP005 | Air India
    
...