Kafka ksql простое соединение не работает - PullRequest
0 голосов
/ 16 мая 2018

Я переписал данные в потоке и таблице. Я использую Confluent 4.1

1) Создать поток

   CREATE STREAM session_details_stream (Media varchar ,SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'sessionDetails', value_format = 'json');

2) Создать переписанный поток, так как этот скрипт не работает, но до этого егоработает, почему?

CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM  partition by root;

затем я создаю следующий сценарий s

CREATE STREAM session_details_stream_update as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM  partition by SessionIdTime;
CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,root from session_details_stream_update  partition by root;

Результат из session_details_stream_rekeyed одобрен:

ksql> select * from session_details_stream_rekeyed;
      1526411486488 | 2018-02-05T15:16:07.113+02:001| tex | 2018-02-05T15:16:07.113+02:001 | 1 | 2018-02-05T15:16:07.113+02:001

3) создайте поток для тем;

 CREATE STREAM voip_details_stream (SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'voipDetails', value_format = 'json');
 CREATE STREAM voip_details_stream_update as select SessionIdTime ,SessionIdSeq, CONCAT(SESSIONIDTIME,SESSIONIDSEQ) as root from voip_details_stream  partition by SessionIdTime;
 CREATE STREAM voip_details_stream_rekeyed6 as select SessionIdTime ,SessionIdSeq,root from voip_details_stream_update  partition by root;



 ksql> select * from voip_details_stream_rekeyed6;
      1526411479438 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:00 | 1 | 2018-02-05T15:16:07.113+02:001

4) создать таблицу

 CREATE TABLE voipDetails_table_test(SessionIdTime varchar,SessionIdSeq long,root varchar) WITH (kafka_topic='VOIP_DETAILS_STREAM_REKEYED6', value_format='JSON', KEY='root');

 ksql> select * from voip_details_table;

       1526411479438 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:00 | 1 | 2018-02-05T15:16:07.113+02:001

5) затем создать левое соединение

select  c.root,u.root from session_details_stream_rekeyed c LEFT JOIN voipDetails_table_test u On c.root  = u.root;

   1526411477780 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:001 | null

в чем проблема?

1 Ответ

0 голосов
/ 17 мая 2018

tl; dr При выполнении соединения с потоковой таблицей ваши сообщения table должны уже существовать (и должны иметь временную метку) перед сообщениями потока.Если вы повторно отправляете исходные сообщения потока, после того как тема таблицы заполнена, объединение будет успешным.

Пример данных

Используйте kafkacat для заполнения тем (вставьте данные в stdin)

cat > /tmp/msgs <<EOF
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}
EOF
kafkacat -b localhost:9092 -P -t sessionDetails /tmp/msgs


cat > /tmp/msgs <<EOF
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}
EOF
kafkacat -b localhost:9092 -P -t voipDetails /tmp/msgs

Проверка содержимого темы:

Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t sessionDetails
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}

Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t voipDetails
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}

Объявление потоков источника

ksql> CREATE STREAM session_details_stream \
      (Media varchar ,SessionIdTime varchar,SessionIdSeq long) \
      WITH (KAFKA_TOPIC = 'sessionDetails', VALUE_FORMAT = 'json');

 Message
----------------
 Stream created
----------------
ksql> CREATE STREAM voip_details_stream \
      (SessionIdTime varchar,SessionIdSeq long, Details varchar) \
      WITH (KAFKA_TOPIC = 'voipDetails', VALUE_FORMAT = 'json');

 Message
----------------
 Stream created
----------------
ksql> select * from session_details_stream;
1526553130864 | null | Foo | 2018-05-17 11:25:33 BST | 1
1526553130865 | null | Foo | 2018-05-17 11:26:33 BST | 2
^CQuery terminated
ksql> select * from voip_details_stream;
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1a
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1b
1526553143176 | null | 2018-05-17 11:26:33 BST | 2 | Bar2
^CQuery terminated

Повторное разделение каждой темы на SessionIdTime + SessionIdSeq

ksql> CREATE STREAM SESSION AS \
      SELECT Media, CONCAT(SessionIdTime,SessionIdSeq) AS root \
      FROM session_details_stream \
      PARTITION BY root;

 Message
----------------------------
 Stream created and running
----------------------------


ksql> SELECT ROWTIME, ROWKEY, root, media FROM SESSION;
1526553130864 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Foo
1526553130865 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Foo


ksql> CREATE STREAM VOIP AS \
      SELECT CONCAT(SessionIdTime,SessionIdSeq) AS root, details \
      FROM voip_details_stream \
      PARTITION BY root;

 Message
----------------------------
 Stream created and running
----------------------------
ksql>

Объявление таблицы

ksql> CREATE TABLE VOIP_TABLE (root VARCHAR, details VARCHAR) \
      WITH (KAFKA_TOPIC='VOIP', VALUE_FORMAT='JSON', KEY='root');

 Message
---------------
 Table created
---------------
ksql> SELECT ROWTIME, ROWKEY, root, details FROM VOIP;
1526553143176 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Bar2
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1a
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1b

Присоединение потока SESSION к таблице VOIP

ksql> SELECT s.ROWTIME, s.root, s.media, v.details \
      FROM SESSION s \
      LEFT OUTER JOIN VOIP_TABLE v ON S.root = V.root;
1526553130864 | 2018-05-17 11:25:33 BST1 | Foo | null
1526553130865 | 2018-05-17 11:26:33 BST2 | Foo | null

Оставьте указанный выше запрос JOIN работающим.Повторно отправьте сообщение SESSION в исходную тему (используйте kafkacat для отправки тех же сообщений на sessionDetails, как указано выше):

1526553862403 | 2018-05-17 11:25:33 BST1 | Foo | Bar1a
1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2

Per Rohan Desai в Confluent Community Slack :

Проблема в том, что время записи в записи из вашего потока раньше, чем время записи в вашей таблице, к которой вы ожидаете присоединиться.Таким образом, при обработке записи потока в таблице нет соответствующей записи

Просмотр сообщения в исходной таблице для одного из ключей соединения с использованием ROWTIME для просмотра метки времени сообщения ( не путать с меткой времени root):

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, details from VOIP WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:23 | 1526553143176 | 2018-05-17 11:26:33 BST2 | Bar2

Сравните это с сообщением в теме потока исходного сеанса:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, media from SESSION WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:10 | 1526553130865 | 2018-05-17 11:26:33 BST2 | Foo
2018-05-17 11:46:28 | 1526553988639 | 2018-05-17 11:26:33 BST2 | Foo

первые из них (на 11:32:10 / 1526553130865) предшествуют соответствующим сообщениям VOIP (показанным выше) и привели к результату соединения null, который мы впервые увидели. секунда из них датирована позже (11:46:28 / 1526553988639), и получилось успешное соединение, которое мы впоследствии увидели:

1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2
...