Конвейер не загружает данные в таблицу memsql, используя процедуру - PullRequest
0 голосов
/ 06 февраля 2019

Я запихнул json (20 пар ключ-значение) в kafka и смог его тоже использовать - проверил, чтобы убедиться, что данные успешно передаются в kafka или нет.

следующий скриптсоздание конвейера, но он не загружает данные в таблицу memsql.Нужно ли мне изменить сценарий создания конвейера для типа данных JSON.

CREATE OR REPLACE PIPELINE omnitracs_gps_evt_pipeline
AS LOAD DATA KAFKA '192.168.188.110:9092/ib_Omnitracs' 
INTO procedure INGEST_OMNITRACS_EVT_PROC;

DELIMITER //
CREATE OR REPLACE PROCEDURE INGEST_OMNITRACS_EVT_PROC(batch query(evt_json json))
AS
BEGIN
    INSERT INTO TEST(id, name) 
      SELECT evt_json::ignition,evt_json::positiontype
      FROM batch;
      ECHO SELECT 'HELLO';
END
//
DELIMITER ; 

TEST PIPELINE omnitracs_gps_evt_pipeline LIMIT 5;
START PIPELINE omnitracs_gps_evt_pipeline FOREGROUND LIMIT 5 BATCHES;

Может кто-нибудь помочь, пожалуйста, как это должно быть.

Ответы [ 3 ]

0 голосов
/ 06 февраля 2019

Вы, вероятно, должны изменить свое предложение CREATE PIPELINE AS LOAD DATA для выполнения собственной загрузки JSON, как описано здесь: https://docs.memsql.com/sql-reference/v6.7/load-data/#json-load-data.

Существует две причины:

  • Конвейер, как написано, будет ожидать ввода от Кафки в формате TSV с 1 полем.TSV - это формат по умолчанию, который выводит ожидаемое количество полей из параметров в целевую хранимую процедуру.На самом деле вполне вероятно, что входные JSON-записи будут успешно проанализированы как таковые, но я бы не стал полагаться на это.

  • Было бы более эффективно использовать предложение subvalue_mapping собственного конвейера JSONизвлечь и вставить :: ignition и :: positiontype, полностью пропуская издержки хранимой процедуры.Кроме того, записанный конвейер будет создавать временные структуры данных JSON в памяти, и это относительно дорого.

Я бы предложил что-то вроде следующего:

CREATE OR REPLACE PIPELINE omnitracs_gps_evt_pipeline
AS LOAD DATA KAFKA '192.168.188.110:9092/ib_Omnitracs' 
INTO TABLE TEST
FORMAT JSON
( 
  id <- ignition_event,
  name <- position_type
);
0 голосов
/ 13 февраля 2019

Трубопровод теперь работает после удаления конфигурации ProducerConfig.TRANSACTIONAL_ID_CONFIG от производителя в kafka.

CREATE PIPELINE FEB13_PIPELINE_2
AS LOAD DATA KAFKA '192.168.188.110:9092/FEB13_PROC' 
INTO procedure INGEST_EVT_PROC;

DELIMITER //
CREATE OR REPLACE PROCEDURE INGEST_EVT_PROC(batch query(evt_json json))
AS
BEGIN
    INSERT INTO TEST_FEB13(ID, NAME) 
      SELECT evt_json::ID,evt_json::NAME
      FROM batch;
END
//
DELIMITER ;

Только одно небольшое сомнение, теперь даже двойная кавычка добавляется в таблицуколонка.Как избежать этого.JSON отправил kafka: "{'ID': 1, 'NAME': \ 'a \'}"

0 голосов
/ 06 февраля 2019

ECHO SELECT не разрешен в хранимых процедурах конвейера.Вы должны были получить сообщение об ошибке, когда вы запустили START PIPELINE ... FOREGROUND или во время CREATE PIPELINE, если процедура была определена.

...