Попробуйте анализировать типы данных в PipelineDB и передавать сообщения об ошибках в текстовую таблицу с ошибками? - PullRequest
0 голосов
/ 15 мая 2019

Мы использовали Pipeline DB для получения данных в потоковую таблицу и в двух потоковых представлениях, в одном представлении, отфильтровывали записи, которые не могли бы привести к типу ошибок проверки достоверности, а в другом представлении фильтровали записи, которые потеряли ошибки при типизации.,В идеале мы пытаемся отделить хорошие записи от плохих записей и сделать так, чтобы они материализовались в две финальные таблицы.

Например, система была настроена на получение данных от третьей стороны в формате YYYY / MM / DD HH24: MI: SS, но по какой-то причине показались значения, где меняются день и месяц.В PipelineDB, поскольку использование PostGres SQL «to_timestamp (mycolumn,« YYYY / MM / DD HH24: MI: SS »)» вызовет серьезную ошибку, если текст в «mycolumn» будет выглядеть примерно так: «2019/15/05 13:10:24.И любые записи, поступающие в поток в рамках этой транзакции, откатываются.(Таким образом, если PG Copy использовалась, одна запись для сбоя представления потоковой передачи материалов приводит к тому, что все записи вставляются в ноль. Это не идеальный сценарий в автоматизации данных, когда сторонняя автоматизированная система может меньше заботиться о нашей проблеме для обработкиего данные.)

Из того, что я вижу: - PostGres не имеет "собственного SQL" способа выполнить "try-parse" - PipelineDB не поддерживает пользовательские функции (если мы написали функцию с двумя выходными даннымиодин для анализа значения, другой возвращает логический столбец "is_valid").(Я предполагаю, что функция находится на сервере, а pipelinedb выполняется как сторонний сервер, а это совсем другое.)

В идеале функция возвращала бы значение typecast и логический флаг, если это былодопустимый, и его можно использовать в предложении WHERE потоковых представлений для ветвления хороших записей из плохих записей.Но я не могу решить это?Есть мысли?

1 Ответ

0 голосов
/ 16 мая 2019

Через много времени я нашел решение этой проблемы. Мне это не нравится, но это сработает.

Меня осенило, когда я понял, что вся проблема основывается на следующем:

http://docs.pipelinedb.com/continuous-transforms.html «Вы можете думать о непрерывных преобразованиях как о триггерах поверх входящих потоковых данных, где функция триггера выполняется для каждого нового вывода строки при непрерывном преобразовании. Внутренне функция выполняется как триггер AFTER INSERT FOR EACH ROW, поэтому OLD отсутствует строка, а строка NEW содержит строку, выводимую непрерывным преобразованием. "

Я потратил часы, пытаясь выяснить: «почему не работают мои пользовательские функции, которые я написал для« пробного разбора »типов данных для входящих потоков данных? Ничего не будет отображаться в материализационном представлении или выходной таблице?» PipelineDB генерирует серьезные ошибки? И затем, через несколько часов, я понимаю, что проблема была связана с тем, что PipelineDB не мог обрабатывать пользовательские функции, а скорее в том, что при непрерывном преобразовании происходит преобразование, выраженное как SQL «ПОСЛЕ ТОКА, КАК ВСТАВЛЕНО». Таким образом, по сути, изменение типотипирования поля данных в материализованном потоке не удалось до его начала.

Решение (которое не очень элегантно) заключается в следующем: 1 - переместить логику приведения типов или любую логику SQL, которая может привести к ошибке, в функцию триггера 2 - создать секцию «EXCEPTION WHEN THEN» внутри функции триггера 3 - убедитесь, что ВОЗВРАЩАЕТСЯ НОВЫМ; происходит в обоих случаях успешного и неудачного преобразования. 4 - сделать непрерывное преобразование простым проходом без применения логики, просто вызвать триггер. (В этом случае это действительно сводит на нет весь смысл использования PipelineDB в некоторой степени для этой начальной проблемы с размещением данных. Но я отступаю.)

После этого я создал таблицу для регистрации ошибок и, убедившись, что все перечисленные выше три шага выполнены, мы обеспечим успешное выполнение транзакции.

Это важно, потому что, если этого не сделано и «вы получаете исключение в исключении», или вы не обработали исключение изящно, то никакие записи не будут загружены.

Это поддерживает стратегию: мы просто пытаемся сделать обработку данных "форк в реке", чтобы маршрутизировать записи, которые успешно преобразуются в одну таблицу (или потоковую таблицу) в одну сторону, и записи, которые не преобразуются в таблицу ошибок .

Ниже я показываю POC, где мы обрабатываем записи как поток и материализуем их в физическую таблицу. (это может быть и другой поток). Ключи к этому понимают:

В таблице ошибок используются текстовые столбцы Функция триггера фиксирует ошибки при попытке преобразования и записывает их в таблицу ошибок с базовым описанием ошибки из системы.

Я упоминаю, что мне не «нравится» решение, но это было лучшее, что я смог найти за несколько часов, чтобы обойти ограничение PipelineDB, выполняющего функции триггера после вставки, поэтому сбой при вставке не мог не пойман, а pipelinedb не имеет встроенной возможности для обработки: - продолжение процесса потока внутри транзакции при сбое - изящный сбой на уровне строк и упрощенный механизм для маршрутизации неудачных преобразований в таблицу ошибок

DROP SCHEMA IF EXISTS pdb CASCADE;
CREATE SCHEMA IF NOT EXISTS pdb;


DROP TABLE IF EXISTS pdb.lis_final;
CREATE TABLE pdb.lis_final(
    edm___row_id bigint,
    edm___created_dtz timestamp with time zone DEFAULT current_timestamp,
    edm___updatedat_dtz timestamp with time zone DEFAULT current_timestamp,
    patient_id text,
    encounter_id text,
    order_id text,
    sample_id text,
    container_id text,
    result_id text,
    orderrequestcode text,
    orderrequestname text,
    testresultcode text,
    testresultname text,
    testresultcost text,
    testordered_dt timestamp,
    samplereceived_dt timestamp,
    testperformed_dt timestamp,
    testresultsreleased_dt timestamp,
    extractedfromsourceat_dt timestamp,
    birthdate_d date
);

DROP TABLE IF EXISTS pdb.lis_errors;
CREATE TABLE pdb.lis_errors(
    edm___row_id bigint,
    edm___errorat_dtz timestamp with time zone default current_timestamp,
    edm___errormsg text,
    patient_id text,
    encounter_id text,
    order_id text,
    sample_id text,
    container_id text,
    result_id text,
    orderrequestcode text,
    orderrequestname text,
    testresultcode text,
    testresultname text,
    testresultcost text,
    testordered_dt text,
    samplereceived_dt text,
    testperformed_dt text,
    testresultsreleased_dt text,
    extractedfromsourceat_dt text,
    birthdate_d text
);


DROP FOREIGN TABLE IF EXISTS pdb.lis_streaming_table CASCADE;
CREATE FOREIGN TABLE pdb.lis_streaming_table (
    edm___row_id serial,
    patient_id text,
    encounter_id text,
    order_id text,
    sample_id text,
    container_id text,
    result_id text,
    orderrequestcode text,
    orderrequestname text,
    testresultcode text,
    testresultname text,
    testresultcost text,
    testordered_dt text,
    samplereceived_dt text,
    testperformed_dt text,
    testresultsreleased_dt text,
    extractedfromsourceat_dt text,
    birthdate_d text
)
SERVER pipelinedb;


CREATE OR REPLACE FUNCTION insert_into_t()
  RETURNS trigger AS
  $$
  BEGIN

    INSERT INTO pdb.lis_final
    SELECT
        NEW.edm___row_id,
        current_timestamp as edm___created_dtz,
        current_timestamp as edm___updatedat_dtz,
        NEW.patient_id,
        NEW.encounter_id,
        NEW.order_id,
        NEW.sample_id,
        NEW.container_id,
        NEW.result_id,
        NEW.orderrequestcode,
        NEW.orderrequestname,
        NEW.testresultcode,
        NEW.testresultname,
        NEW.testresultcost,
        to_timestamp(NEW.testordered_dt,'YYYY/MM/DD HH24:MI:SS') as testordered_dt,
        to_timestamp(NEW.samplereceived_dt,'YYYY/MM/DD HH24:MI:SS') as samplereceived_dt,
        to_timestamp(NEW.testperformed_dt,'YYYY/MM/DD HH24:MI:SS') as testperformed_dt,
        to_timestamp(NEW.testresultsreleased_dt,'YYYY/MM/DD HH24:MI:SS') as testresultsreleased_dt,
        to_timestamp(NEW.extractedfromsourceat_dt,'YYYY/MM/DD HH24:MI:SS') as extractedfromsourceat_dt,
        to_date(NEW.birthdate_d,'YYYY/MM/DD') as birthdate_d;

    -- Return new as nothing happens
    RETURN NEW;

    EXCEPTION WHEN others THEN

        INSERT INTO pdb.lis_errors
        SELECT
            NEW.edm___row_id,
            current_timestamp as edm___errorat_dtz,
            SQLERRM as edm___errormsg,
            NEW.patient_id,
            NEW.encounter_id,
            NEW.order_id,
            NEW.sample_id,
            NEW.container_id,
            NEW.result_id,
            NEW.orderrequestcode,
            NEW.orderrequestname,
            NEW.testresultcode,
            NEW.testresultname,
            NEW.testresultcost,
            NEW.testordered_dt,
            NEW.samplereceived_dt,
            NEW.testperformed_dt,
            NEW.testresultsreleased_dt,
            NEW.extractedfromsourceat_dt,
            NEW.birthdate_d;

        -- Return new back to the streaming view as we don't want that process to error.  We already routed the record above to the errors table as text.
        RETURN NEW;

  END;
  $$
  LANGUAGE plpgsql;


DROP VIEW IF EXISTS pdb.lis_tryparse CASCADE;
CREATE VIEW pdb.lis_tryparse WITH (action=transform, outputfunc=insert_into_t) AS
SELECT
    edm___row_id,
    patient_id,
    encounter_id,
    order_id,
    sample_id,
    container_id,
    result_id,
    orderrequestcode,
    orderrequestname,
    testresultcode,
    testresultname,
    testresultcost,
    testordered_dt,
    samplereceived_dt,
    testperformed_dt,
    testresultsreleased_dt,
    extractedfromsourceat_dt,
    birthdate_d
FROM pdb.lis_streaming_table as st;
...