Это ошибка из-за непреднамеренного неправильного использования CREATE STREAM
в неправильном синтаксисе. Вы используете вариант для «регистрации» потока KSQL по существующей теме. Чтобы INSERT INTO
работал, он должен быть CREATE STREAM target AS SELECT
(«CSAS»).
Давайте проработаем это. Здесь я использую этот docker-compose для тестовой настройки.
Заполните некоторые фиктивные данные:
docker run --rm --interactive --network cos_default confluentinc/cp-kafkacat kafkacat -b kafka:29092 -t events -P <<EOF
{"eventType":"1", "eventTime" :"2018-11-13-06:34:57", "sourceHostName":"asgard"}
{"eventType":"2", "eventTime" :"2018-11-13-06:35:57", "sourceHostName":"asgard"}
{"eventType":"MatchingValue", "eventTime" :"2018-11-13-06:35:58", "sourceHostName":"asgard"}
EOF
Зарегистрировать исходную тему с помощью KSQL:
CREATE STREAM event_stream (eventType varchar, eventTime varchar, sourceHostName varchar) WITH (kafka_topic='events', value_format='json');
Запрос потока:
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> SELECT * FROM event_stream;
1542091084660 | null | 1 | 2018-11-13-06:34:57 | asgard
1542091084660 | null | 2 | 2018-11-13-06:35:57 | asgard
1542091785207 | null | MatchingValue | 2018-11-13-06:35:58 | asgard
Итак, глядя на CREATE STREAM
, который вы цитируете:
CREATE STREAM output_stream (eventTime varchar, extraColumn varchar, sourceHostName varchar) WITH (kafka_topic='output_events', value_format='json');
Я предполагаю, что если вы запустите LIST TOPICS;
, вы увидите, что эта тема уже существует в вашем брокере Kafka?
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
----------------------------------------------------------------------------------------------------
_confluent-metrics | false | 12 | 1 | 0 | 0
_schemas | false | 1 | 1 | 0 | 0
docker-connect-configs | false | 1 | 1 | 0 | 0
docker-connect-offsets | false | 25 | 1 | 0 | 0
docker-connect-status | false | 5 | 1 | 0 | 0
events | true | 1 | 1 | 0 | 0
output_events | false | 4 | 1 | 0 | 0
----------------------------------------------------------------------------------------------------
ksql>
Потому что, если бы не было, этот CREATE STREAM
потерпел бы неудачу:
ksql> CREATE STREAM output_stream (eventTime varchar, extraColumn varchar, sourceHostName varchar) WITH (kafka_topic='output_events', value_format='json');
Kafka topic does not exist: output_events
ksql>
Итак, делая это предположение, я также создаю эту тему на своем тестовом кластере:
$ docker-compose exec kafka bash -c "kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 4 --topic output_events"
И затем создание потока:
ksql> CREATE STREAM output_stream (eventTime varchar, extraColumn varchar, sourceHostName varchar) WITH (kafka_topic='output_events', value_format='json');
Message
----------------
Stream created
----------------
Обратите внимание, что там написано Stream created
, а не Stream created and running
Теперь давайте запустим INSERT INTO
:
ksql> INSERT INTO output_stream SELECT eventTime, 'Extra info' as extraColumn, sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';
Message
-------------------------------
Insert Into query is running.
-------------------------------
Вывод DESCRIBE EXTENDED
действительно показывает, как вы видели, обрабатываемые сообщения:
ksql> DESCRIBE EXTENDED output_stream;
Name : OUTPUT_STREAM
Type : STREAM
Key field :
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : JSON
Kafka topic : output_events (partitions: 4, replication: 1)
Field | Type
--------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
EVENTTIME | VARCHAR(STRING)
EXTRACOLUMN | VARCHAR(STRING)
SOURCEHOSTNAME | VARCHAR(STRING)
--------------------------------------------
Queries that write into this STREAM
-----------------------------------
InsertQuery_0 : INSERT INTO output_stream SELECT eventTime, 'Extra info' as extraColumn, sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
messages-per-sec: 0.01 total-messages: 1 last-message: 11/13/18 6:49:46 AM UTC
failed-messages: 0 failed-messages-per-sec: 0 last-failed: n/a
(Statistics of the local KSQL server interaction with the Kafka topic output_events)
Но сама тема не имеет сообщений:
ksql> print 'output_events' from beginning;
^C
ни поток KSQL:
ksql> SELECT * FROM OUTPUT_STREAM;
^CQuery terminated
Таким образом, команда INSERT INTO
предназначена для работы с существующим целевым потоком CSAS / CTAS , а не с исходным потоком STREAM, зарегистрированным для существующей темы.
Давайте попробуем так. Во-первых, нам нужно удалить существующее определение потока, а также завершить запрос INSERT INTO
:
ksql> DROP STREAM OUTPUT_STREAM;
Cannot drop OUTPUT_STREAM.
The following queries read from this source: [].
The following queries write into this source: [InsertQuery_0].
You need to terminate them before dropping OUTPUT_STREAM.
ksql> TERMINATE InsertQuery_0;
Message
-------------------
Query terminated.
-------------------
ksql> DROP STREAM OUTPUT_STREAM;
Message
------------------------------------
Source OUTPUT_STREAM was dropped.
------------------------------------
Теперь создайте целевой поток:
ksql> CREATE STREAM output_stream WITH (kafka_topic='output_events') AS SELECT eventTime, 'Extra info' as extraColumn, sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';
Message
----------------------------
Stream created and running
----------------------------
Обратите внимание, что при создании потока это также running
(против того, что было просто created
). Теперь запросите поток:
ksql> SELECT * FROM OUTPUT_STREAM;
1542091785207 | null | 2018-11-13-06:35:58 | Extra info | asgard
и проверьте основную тему тоже:
ksql> PRINT 'output_events' FROM BEGINNING;
Format:JSON
{"ROWTIME":1542091785207,"ROWKEY":"null","EVENTTIME":"2018-11-13-06:35:58","EXTRACOLUMN":"Extra info","SOURCEHOSTNAME":"asgard"}
Итак, вы столкнулись с ошибкой в KSQL ( поднял здесь ), но ее, к счастью, можно избежать, если полностью использовать более простой синтаксис KSQL, объединяя ваши запросы CREATE STREAM
и INSERT INTO
в один ,