Как спорить темы с помощью ksql - PullRequest
0 голосов
/ 09 июля 2019

Я новичок в kafka и ksql, у меня есть 2 тестовые темы в kafka.

1-пользователи

{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"role"},{"type":"string","optional":true,"field":"status"},{"type":"boolean","optional":true,"field":"isPhoneVerified"},{"type":"array","items":{"type":"string","optional":true},"optional":true,"field":"personalDocuments"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"firstName"},{"type":"string","optional":true,"field":"lastName"},{"type":"string","optional":true,"field":"phoneNumber"},{"type":"string","optional":true,"field":"password"},{"type":"string","optional":true,"field":"pushToken"},{"type":"string","optional":false,"field":"created_at"},{"type":"string","optional":false,"field":"updated_at"},{"type":"int32","optional":true,"field":"__v"},{"type":"string","optional":true,"field":"verifyPhoneCode"},{"type":"string","optional":true,"field":"id"}],"optional":false,"name":"mongo_conn.digi.users"},"payload":{"role":"client","status":"active","isPhoneVerified":true,"personalDocuments":[],"email":"test@gmail.com","firstName":"test","lastName":"tifokaada","phoneNumber":"+99999999","password":"$2a$10kjjdytdLj77Wh2bvYuEJdh5lOZfI2wWQutzefwJ8EecabatPW","pushToken":"null","created_at":"2019-07-08 11:09:16 +0000","updated_at":"2019-07-08 14:23:57 +0000","__v":0,"verifyPhoneCode":null,"id":"5d23245c1b4bfb79141e43ce"}} __ v ": 0," verifyPhoneCode ": нулевой," идентификатор ":" 5d23245c1b4bfb79141e43ce "}}

2-сделки

{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"sender"},{"type":"string","optional":true,"field":"receiver"},{"type":"string","optional":true,"field":"receiverWalletId"},{"type":"string","optional":true,"field":"status"},{"type":"string","optional":true,"field":"type"},{"type":"int32","optional":true,"field":"amount"},{"type":"int32","optional":true,"field":"totalFee"},{"type":"string","optional":false,"field":"createdAt"},{"type":"string","optional":false,"field":"updatedAt"},{"type":"int32","optional":true,"field":"__v"},{"type":"string","optional":true,"field":"from"},{"type":"string","optional":true,"field":"orderId"},{"type":"string","optional":true,"field":"txId"},{"type":"string","optional":true,"field":"id"}],"optional":false,"name":"mongo_conn.digi.transactions"},"payload":{"sender":"5d20872531ad805fd8a6c88a","receiver":"5d20872531ad805fd8a6c88a","receiverWalletId":"5d20872531ad805fd8a6c88b","status":"success","type":"cashout","amount":50000,"totalFee":0,"createdAt":"2019-07-08 12:34:44 +0000","updatedAt":"2019-07-08 12:35:35 +0000","__v":0,"from":"smt","orderId":"1656b3c6-971a-715d-8e41-d8bc00a43714","txId":"2b35c8e823e3e7fcfa914432bf33ae36bb51b7412393b4ad4c8cde97481147b8","id":"5d234cbee69b5a16cad17535"}}

Я хочу объединить транзакции и пользователей в одну тему для дальнейшего анализа, я хочу объединить transactions.payload.reciever с users.payload.id, чтобы я мог наконец иметь транзакции с пользователями, вложенными в transactions.payload.reciever, возможно ли это? Я думал о чем-то вроде этого.

ksql> CREATE TABLE users (id VARCHAR, firstName VARCHAR, lastName VARCHAR, email VARCHAR, created_at VARCHAR , updated_at VARCHAR) WITH (KAFKA_TOPIC='mongo_conn.digi.users', VALUE_FORMAT='JSON', KEY=’id’);

ksql> CREATE STREAM transactions (reciever VARCHAR, sender VARCHAR, createdAt VARCHAR, amount int32, type VARCHAR) WITH (KAFKA_TOPIC= 'transactions', VALUE_FORMAT='JSON');

ksql> CREATE TABLE finaltopic WITH (VALUE_FORMAT='JSON') AS SELECT  firstName, lastName, created_at FROM users LEFT JOIN transactions a ON users.id = transactions.id;

Я также хочу знать, даже если мне удастся обернуть мои данные, нужен ли мне соединитель ksql для передачи моих данных вasticsearch или продолжать использовать ввод kafka, я знаю, что приведенный выше код неверен, я просто хочу получить идея или пример этого.

...