KSQL - INSERT INTO a Stream не дает никаких данных - PullRequest
0 голосов
/ 09 ноября 2018

У меня проблемы с чтением сообщений из потока, который заполняется с помощью операции INSERT INTO KSQL.

Шаги, которые я выполнил:

У меня есть поток event_stream, который я создал из темы кафки.

CREATE STREAM event_stream (eventType varchar, eventTime varchar, 
sourceHostName varchar) WITH (kafka_topic='events', value_format='json');

SELECT * FROM event_stream; показывает правильно поступающие сообщения.

Я хочу отправить некоторые из этих сообщений в другую тему в kafka, output_events, которую я уже создал.

Затем я создаю второй поток в KSQL:

CREATE STREAM output_stream (eventTime varchar, extraColumn varchar, 
sourceHostName varchar) WITH (kafka_topic='output_events', value_format='json');

Наконец, я связываю вход с выходом следующим образом:

INSERT INTO output_stream SELECT eventTime, 'Extra info' as extraColumn,    
sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';

Все вышеперечисленное, кажется, завершается без ошибок, однако, если я запускаю SELECT * FROM output_stream; Я не получаю данных. Почему это?

Выполнение части запроса SELECT вышеупомянутого запроса работает нормально, поэтому я вижу, что по теме приходят соответствующие результаты.

Странно, если я запускаю DESCRIBE EXTENDED output_stream, количество сообщений указывает, что сообщения достигают потока:

Local runtime statistics                                                                        
------------------------                                                                        
messages-per-sec:      0.33   total-messages:        86     last-message: 11/9/18 1:15:43 PM UTC
 failed-messages:         0 failed-messages-per-sec:         0      last-failed:       n/a      
(Statistics of the local KSQL server interaction with the Kafka topic output_events) 

Я также проверил логи ksql-сервера, но не вижу там никаких ошибок.

1 Ответ

0 голосов
/ 13 ноября 2018

Это ошибка из-за непреднамеренного неправильного использования CREATE STREAM в неправильном синтаксисе. Вы используете вариант для «регистрации» потока KSQL по существующей теме. Чтобы INSERT INTO работал, он должен быть CREATE STREAM target AS SELECT («CSAS»).

Давайте проработаем это. Здесь я использую этот docker-compose для тестовой настройки.

Заполните некоторые фиктивные данные:

docker run --rm --interactive --network cos_default confluentinc/cp-kafkacat kafkacat -b kafka:29092 -t events -P <<EOF
{"eventType":"1", "eventTime" :"2018-11-13-06:34:57", "sourceHostName":"asgard"}
{"eventType":"2", "eventTime" :"2018-11-13-06:35:57", "sourceHostName":"asgard"}
{"eventType":"MatchingValue", "eventTime" :"2018-11-13-06:35:58", "sourceHostName":"asgard"}
EOF

Зарегистрировать исходную тему с помощью KSQL:

CREATE STREAM event_stream (eventType varchar, eventTime varchar, sourceHostName varchar) WITH (kafka_topic='events', value_format='json');

Запрос потока:

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> SELECT * FROM event_stream;
1542091084660 | null | 1 | 2018-11-13-06:34:57 | asgard
1542091084660 | null | 2 | 2018-11-13-06:35:57 | asgard
1542091785207 | null | MatchingValue | 2018-11-13-06:35:58 | asgard

Итак, глядя на CREATE STREAM, который вы цитируете:

CREATE STREAM output_stream (eventTime varchar, extraColumn varchar, sourceHostName varchar) WITH (kafka_topic='output_events', value_format='json');

Я предполагаю, что если вы запустите LIST TOPICS;, вы увидите, что эта тема уже существует в вашем брокере Kafka?

ksql> LIST TOPICS;

Kafka Topic            | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
----------------------------------------------------------------------------------------------------
_confluent-metrics     | false      | 12         | 1                  | 0         | 0
_schemas               | false      | 1          | 1                  | 0         | 0
docker-connect-configs | false      | 1          | 1                  | 0         | 0
docker-connect-offsets | false      | 25         | 1                  | 0         | 0
docker-connect-status  | false      | 5          | 1                  | 0         | 0
events                 | true       | 1          | 1                  | 0         | 0
output_events          | false      | 4          | 1                  | 0         | 0
----------------------------------------------------------------------------------------------------
ksql>

Потому что, если бы не было, этот CREATE STREAM потерпел бы неудачу:

ksql> CREATE STREAM output_stream (eventTime varchar, extraColumn varchar, sourceHostName varchar) WITH (kafka_topic='output_events', value_format='json');
Kafka topic does not exist: output_events
ksql>

Итак, делая это предположение, я также создаю эту тему на своем тестовом кластере:

$ docker-compose exec kafka bash -c "kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 4 --topic output_events"

И затем создание потока:

ksql> CREATE STREAM output_stream (eventTime varchar, extraColumn varchar, sourceHostName varchar) WITH (kafka_topic='output_events', value_format='json');

Message
----------------
Stream created
----------------

Обратите внимание, что там написано Stream created, а не Stream created and running

Теперь давайте запустим INSERT INTO:

ksql> INSERT INTO output_stream SELECT eventTime, 'Extra info' as extraColumn, sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';

Message
-------------------------------
Insert Into query is running.
-------------------------------

Вывод DESCRIBE EXTENDED действительно показывает, как вы видели, обрабатываемые сообщения:

ksql> DESCRIBE EXTENDED output_stream;

Name                 : OUTPUT_STREAM
Type                 : STREAM
Key field            :
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : output_events (partitions: 4, replication: 1)

Field          | Type
--------------------------------------------
ROWTIME        | BIGINT           (system)
ROWKEY         | VARCHAR(STRING)  (system)
EVENTTIME      | VARCHAR(STRING)
EXTRACOLUMN    | VARCHAR(STRING)
SOURCEHOSTNAME | VARCHAR(STRING)
--------------------------------------------

Queries that write into this STREAM
-----------------------------------
InsertQuery_0 : INSERT INTO output_stream SELECT eventTime, 'Extra info' as extraColumn, sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------
messages-per-sec:      0.01   total-messages:         1     last-message: 11/13/18 6:49:46 AM UTC
failed-messages:         0 failed-messages-per-sec:         0      last-failed:       n/a
(Statistics of the local KSQL server interaction with the Kafka topic output_events)

Но сама тема не имеет сообщений:

ksql> print 'output_events' from beginning;
^C

ни поток KSQL:

ksql> SELECT * FROM OUTPUT_STREAM;
^CQuery terminated

Таким образом, команда INSERT INTO предназначена для работы с существующим целевым потоком CSAS / CTAS , а не с исходным потоком STREAM, зарегистрированным для существующей темы.

Давайте попробуем так. Во-первых, нам нужно удалить существующее определение потока, а также завершить запрос INSERT INTO:

ksql> DROP STREAM OUTPUT_STREAM;
Cannot drop OUTPUT_STREAM.
The following queries read from this source: [].
The following queries write into this source: [InsertQuery_0].
You need to terminate them before dropping OUTPUT_STREAM.
ksql> TERMINATE InsertQuery_0;

Message
-------------------
Query terminated.
-------------------
ksql> DROP STREAM OUTPUT_STREAM;

Message
------------------------------------
Source OUTPUT_STREAM was dropped.
------------------------------------

Теперь создайте целевой поток:

ksql> CREATE STREAM output_stream WITH (kafka_topic='output_events') AS SELECT eventTime, 'Extra info' as extraColumn, sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';

Message
----------------------------
Stream created and running
----------------------------

Обратите внимание, что при создании потока это также running (против того, что было просто created). Теперь запросите поток:

ksql> SELECT * FROM OUTPUT_STREAM;
1542091785207 | null | 2018-11-13-06:35:58 | Extra info | asgard

и проверьте основную тему тоже:

ksql> PRINT 'output_events' FROM BEGINNING;
Format:JSON
{"ROWTIME":1542091785207,"ROWKEY":"null","EVENTTIME":"2018-11-13-06:35:58","EXTRACOLUMN":"Extra info","SOURCEHOSTNAME":"asgard"}

Итак, вы столкнулись с ошибкой в ​​KSQL ( поднял здесь ), но ее, к счастью, можно избежать, если полностью использовать более простой синтаксис KSQL, объединяя ваши запросы CREATE STREAM и INSERT INTO в один ,

...