Вариант использования
Цель состоит в том, чтобы идентифицировать входящие события / строки, чтобы проверить, является ли это новой строкой или обновлением.Новая строка перейдет к другой теме, а строка обновления перейдет к другой теме.
Подход: создайте таблицу поиска (KTABLE) и выполните две операции соединения 1. Внутреннее объединение для обнаружения обновления.2. Левое соединение, когда правая клавиша таблицы пуста, чтобы обнаружить строку Вставить / Новая.Создайте два потока из результата вышеупомянутых двух операций.Запустите вставку в запрос к потоку, который вставит записи в таблицу поиска.
Шаги для воспроизведения проблемы здесь: (Занимает 7 минут)
Шаг 1 .docker-compose up
Последняя платформа Confluent 5.1.0 с докером .
Шаг 2 .docker ps
Примечание: убедитесь, что брокер работает. Брокер часто снижается в моем регионе.
Шаг 3 .
Получить доступ к bash реестра схемы в новом терминале. (Это легко контролировать, если оставить этот терминал открытым.)
docker run -it --net=cp-all-in-one_default --rm confluentinc/cp-schema-registry:5.1.0 bash
Шаг 4 .
Создание таблицы поиска.С темой LOAD.TEST.LOCAL.LOOKUP.TABLE.Моя схема имеет ключ типа String.Три образца записи ниже.Сначала вы заполняете таблицу поиска первичными 3 фиктивными записями.
kafka-avro-console-producer --broker-list broker:9092 --topic LOAD.TEST.LOCAL.LOOKUP.TABLE \
--property schema.registry.url=http://schema-registry:8081 \
--property parse.key=true \
--property key.separator=, \
--property key.schema='{"type":"string"}' \
--property value.schema='{"name":"LOAD.TEST.LOCAL.LOOKUP.TABLE","type":"record","namespace":"example.sender.batch","fields":[{"name":"SENDER_CODE","type":"string"},{"name":"SENDER_NAME","type":"string"},{"name":"SENDER_CATEGORY_CODE","type":"string"},{"name":"SENDER_AGENCY_CODE","type":"string"},{"name":"SENDER_SUB_AGENCY_CODE","type":"string"},{"name":"SENDER_FOREIGN_IND","type":"string"},{"name":"SENDER_FOREIGN_COUNTRY","type":"string"},{"name":"SENDER_NAME_ALTERNATE","type":"string"},{"name":"PARENT_SENDER_CODE","type":"string"},{"name":"CHANGE_DATE","type":"string"},{"name":"REQUESTING_LOCATION","type":"string"},{"name":"REQUEST_DATE","type":"string"},{"name":"REPLACEMENT_SENDER_CODE","type":"string"},{"name":"SENDER_STATUS","type":"string"},{"name":"SENDER_DUNS","type":"string"},{"name":"ADDRESSLINE1","type":"string"},{"name":"ADDRESSLINE2","type":"string"},{"name":"ADDRESSLINE3","type":"string"},{"name":"ADDRESS4","type":"string"},{"name":"CITY","type":"string"},{"name":"STATE","type":"string"},{"name":"POSTAL_CODE","type":"string"},{"name":"URL","type":"string"},{"name":"SENDER_ACRONYM","type":"string"},{"name":"DEACTIVATED_DATE","type":"string"},{"name":"Kafka_TimeEvent","type":"string"}]}'
Теперь вы можете вставить записи ниже.Просто вставьте 3 записи ниже.Если вы нажимаете return несколько раз и получаете исключение, просто запустите ту же команду, приведенную выше, еще раз и вставьте ее после нажатия return
один раз.
"SVI6FQ",{"SENDER_CODE":"SVI6FQ","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"373 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXB","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
"SVI6FR",{"SENDER_CODE":"SVI6FR","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"374 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 973","ADDRESSLINE3":"MAILBOXC","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
"SVI6FN",{"SENDER_CODE":"SVI6FN","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"372 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXA","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
нажмите ⌘+c
для выхода.
Шаг 5 .На другом терминале откройте KSQL CLI
docker run --network cp-all-in-one_default --interactive --tty --rm confluentinc/cp-ksql-cli:latest http://ksql-server:8088
Шаг 6 .Создайте KTABLE.
create table load_test_local_lookup_table with (KAFKA_TOPIC='LOAD.TEST.LOCAL.LOOKUP.TABLE',VALUE_FORMAT='AVRO',KEY='SENDER_CODE');
Шаг 7 .Обязательно установите свойство ниже, чтобы вы могли видеть результаты с начального смещения.Запустите это в KSQL.
ksql> SET 'auto.offset.reset'='earliest';
Вы увидите следующее сообщение.Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
Шаг 8 .Теперь создайте тему, где ваши события будут транслироваться.Используйте bash реестра схем из шага 4. Кроме того, заполните те же записи в основной теме.
kafka-avro-console-producer --broker-list broker:9092 --topic LOAD.TEST.LOCAL.EVENT.STREAM \
--property schema.registry.url=http://schema-registry:8081 \
--property parse.key=true \
--property key.separator=, \
--property key.schema='{"type":"string"}' \
--property value.schema='{"name":"LOAD.TEST.LOCAL.EVENT.STREAM","type":"record","namespace":"example.sender.batch","fields":[{"name":"SENDER_CODE","type":"string"},{"name":"SENDER_NAME","type":"string"},{"name":"SENDER_CATEGORY_CODE","type":"string"},{"name":"SENDER_AGENCY_CODE","type":"string"},{"name":"SENDER_SUB_AGENCY_CODE","type":"string"},{"name":"SENDER_FOREIGN_IND","type":"string"},{"name":"SENDER_FOREIGN_COUNTRY","type":"string"},{"name":"SENDER_NAME_ALTERNATE","type":"string"},{"name":"PARENT_SENDER_CODE","type":"string"},{"name":"CHANGE_DATE","type":"string"},{"name":"REQUESTING_LOCATION","type":"string"},{"name":"REQUEST_DATE","type":"string"},{"name":"REPLACEMENT_SENDER_CODE","type":"string"},{"name":"SENDER_STATUS","type":"string"},{"name":"SENDER_DUNS","type":"string"},{"name":"ADDRESSLINE1","type":"string"},{"name":"ADDRESSLINE2","type":"string"},{"name":"ADDRESSLINE3","type":"string"},{"name":"ADDRESS4","type":"string"},{"name":"CITY","type":"string"},{"name":"STATE","type":"string"},{"name":"POSTAL_CODE","type":"string"},{"name":"URL","type":"string"},{"name":"SENDER_ACRONYM","type":"string"},{"name":"DEACTIVATED_DATE","type":"string"},{"name":"Kafka_TimeEvent","type":"string"}]}'
"SVI6FQ",{"SENDER_CODE":"SVI6FQ","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"373 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXB","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
"SVI6FR",{"SENDER_CODE":"SVI6FR","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"374 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 973","ADDRESSLINE3":"MAILBOXC","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
"SVI6FN",{"SENDER_CODE":"SVI6FN","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"372 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXA","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
Шаг 9 .
Создать поток для этой темы мероприятия.
create stream load_test_local_event_stream with (KAFKA_TOPIC='LOAD.TEST.LOCAL.EVENT.STREAM',VALUE_FORMAT='AVRO',KEY='SENDER_CODE');
Шаг 10 .
Извлечь поток после объединения Stream-Table, который будет использоваться для обнаружения уже существующей строки.Мы называем это update_stream.Тема, созданная для этого потока ниже, будет иметь только обновления.Это один из моих вариантов использования.Я должен фильтровать сообщения, которые являются обновлениями.
create stream load_test_update_stream as select event.* FROM load_test_local_event_stream event JOIN load_test_local_lookup_table lookup ON event.sender_code = lookup.sender_Code;
Шаг 11 .
Создать поток с темой, которая используется для таблицы поиска.Так что, если вы хотите обновить таблицу поиска, вы можете вставить в этот поток.(Если я не ошибаюсь: вы не можете напрямую вставить в ktable из потока.).Делая это.
create stream load_test_lookup_feed_stream with (KAFKA_TOPIC='LOAD.TEST.LOCAL.LOOKUP.TABLE',VALUE_FORMAT='AVRO',KEY='SENDER_CODE');
Шаг 12 .Запустите вставку в запрос.Этот запрос будет вставлен в поток канала поиска в таблице, который обновит таблицу поиска, когда сообщение будет доступно в update_stream.
Insert into load_test_lookup_feed_stream select EVENT_SENDER_CODE AS SENDER_CODE, EVENT_SENDER_NAME AS SENDER_NAME, EVENT_SENDER_CATEGORY_CODE AS SENDER_CATEGORY_CODE , EVENT_SENDER_AGENCY_CODE AS SENDER_AGENCY_CODE , EVENT_SENDER_SUB_AGENCY_CODE AS SENDER_SUB_AGENCY_CODE, EVENT_SENDER_FOREIGN_IND AS SENDER_FOREIGN_IND, EVENT_SENDER_FOREIGN_COUNTRY AS SENDER_FOREIGN_COUNTRY , EVENT_SENDER_NAME_ALTERNATE AS SENDER_NAME_ALTERNATE, EVENT_PARENT_SENDER_CODE AS PARENT_SENDER_CODE ,EVENT_CHANGE_DATE AS CHANGE_DATE, EVENT_REQUESTING_LOCATION AS REQUESTING_LOCATION , EVENT_REQUEST_DATE AS REQUEST_DATE, EVENT_REPLACEMENT_SENDER_CODE AS REPLACEMENT_SENDER_CODE , EVENT_SENDER_STATUS AS SENDER_STATUS, EVENT_SENDER_DUNS AS SENDER_DUNS , EVENT_ADDRESSLINE1 AS ADDRESSLINE1 , EVENT_ADDRESSLINE2 AS ADDRESSLINE2, EVENT_ADDRESSLINE3 AS ADDRESSLINE3 , EVENT_ADDRESS4 AS ADDRESS4 , EVENT_CITY AS CITY , EVENT_STATE AS STATE, EVENT_POSTAL_CODE AS POSTAL_CODE, EVENT_URL AS URL, EVENT_SENDER_ACRONYM AS SENDER_ACRONYM , EVENT_DEACTIVATED_DATE AS DEACTIVATED_DATE, EVENT_KAFKA_TIMEEVENT AS KAFKA_TIMEEVENT from load_test_update_stream partition by SENDER_CODE ;
: exclamation: Проблема: Несмотря на то, что это обновляет мою таблицу поиска, она обновляется как новая запись.Не как обновление.чтобы повторить эту проблему (выполните шаги 15А).
Шаг 13 .Очень похоже на обновление, создайте поток, который будет обнаруживать новые записи в событии.
create stream load_test_insert_stream as select event.* FROM load_test_local_event_stream event left JOIN load_test_local_lookup_table lookup ON event.sender_code = lookup.sender_Code where lookup.sender_Code is null ;
Проверка. При желании можно выполнить только запрос select, чтобы понять, что происходит.Если ваш bash схемы-реестра открыт, вставьте новую запись со своим собственным ключом.(Попробуйте вставить новую запись, как в разделе 15А).Это новое сообщение будет доступно в этом потоке.
Шаг 14 .Создайте вставку в запрос, как раньше. Это вставка обратно в таблицу поиска.Теперь ваша таблица поиска заполнена новым сообщением.
Insert into load_test_lookup_feed_stream select EVENT_SENDER_CODE AS SENDER_CODE, EVENT_SENDER_NAME AS SENDER_NAME, EVENT_SENDER_CATEGORY_CODE AS SENDER_CATEGORY_CODE , EVENT_SENDER_AGENCY_CODE AS SENDER_AGENCY_CODE , EVENT_SENDER_SUB_AGENCY_CODE AS SENDER_SUB_AGENCY_CODE, EVENT_SENDER_FOREIGN_IND AS SENDER_FOREIGN_IND, EVENT_SENDER_FOREIGN_COUNTRY AS SENDER_FOREIGN_COUNTRY , EVENT_SENDER_NAME_ALTERNATE AS SENDER_NAME_ALTERNATE, EVENT_PARENT_SENDER_CODE AS PARENT_SENDER_CODE ,EVENT_CHANGE_DATE AS CHANGE_DATE, EVENT_REQUESTING_LOCATION AS REQUESTING_LOCATION , EVENT_REQUEST_DATE AS REQUEST_DATE, EVENT_REPLACEMENT_SENDER_CODE AS REPLACEMENT_SENDER_CODE , EVENT_SENDER_STATUS AS SENDER_STATUS, EVENT_SENDER_DUNS AS SENDER_DUNS , EVENT_ADDRESSLINE1 AS ADDRESSLINE1 , EVENT_ADDRESSLINE2 AS ADDRESSLINE2, EVENT_ADDRESSLINE3 AS ADDRESSLINE3 , EVENT_ADDRESS4 AS ADDRESS4 , EVENT_CITY AS CITY , EVENT_STATE AS STATE, EVENT_POSTAL_CODE AS POSTAL_CODE, EVENT_URL AS URL, EVENT_SENDER_ACRONYM AS SENDER_ACRONYM , EVENT_DEACTIVATED_DATE AS DEACTIVATED_DATE, EVENT_KAFKA_TIMEEVENT AS KAFKA_TIMEEVENT from load_test_insert_stream partition by SENDER_CODE ;
Шаг 15 .
В чем проблема: Как копировать.
Шаг 15А . Как вставить новую запись примера
Запустите команду на шаге 8 (со схемой).Вставьте / вставьте новую запись, как показано ниже. Обратите внимание, я изменил и ключ сообщения, и код отправителя.Всегда ваш ключ сообщения и ключ строки должны совпадать. например: "SVI6FW","SENDER_CODE":"SVI6FW
"SVI6FW",{"SENDER_CODE":"SVI6FW","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"373 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXB","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
Шаг 15B . Как обновить пример записи
Очень похоже на предыдущую (15А) вставку новой записи, но используйте тот же ключ сообщения и просто обновите имя или какое-либо значение.Например, 'SAM II'
стало 'SAM III'
"SVI6FW",{"SENDER_CODE":"SVI6FW","SENDER_NAME":"SENDER SAM III","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"373 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXB","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
Проблема Если вы видите, Моя таблица поиска не обновляется, Она обрабатывает каждое сообщение как новое, даже если оноотправлено с тем же ключом.Из-за этого я не могу обнаружить обновления.Каждое сообщение идет как новое сообщение.
Вы можете проверить, выполнив следующие действия.
- Отправить новое сообщение с вашим собственным ключом (15A).Это будет доступно в load_test_insert_stream.
- Отправьте обновленное сообщение с тем же ключом, что и 15B.Он должен быть доступен в load_test_update_stream, но он собирается загрузить load_test_insert_stream.и таблица поиска обрабатывает его как новое сообщение.
Любой новый подход / предложения приветствуются!