Через много времени я нашел решение этой проблемы. Мне это не нравится, но это сработает.
Меня осенило, когда я понял, что вся проблема основывается на следующем:
http://docs.pipelinedb.com/continuous-transforms.html
«Вы можете думать о непрерывных преобразованиях как о триггерах поверх входящих потоковых данных, где функция триггера выполняется для каждого нового вывода строки при непрерывном преобразовании. Внутренне функция выполняется как триггер AFTER INSERT FOR EACH ROW, поэтому OLD отсутствует строка, а строка NEW содержит строку, выводимую непрерывным преобразованием. "
Я потратил часы, пытаясь выяснить: «почему не работают мои пользовательские функции, которые я написал для« пробного разбора »типов данных для входящих потоков данных? Ничего не будет отображаться в материализационном представлении или выходной таблице?» PipelineDB генерирует серьезные ошибки? И затем, через несколько часов, я понимаю, что проблема была связана с тем, что PipelineDB не мог обрабатывать пользовательские функции, а скорее в том, что при непрерывном преобразовании происходит преобразование, выраженное как SQL «ПОСЛЕ ТОКА, КАК ВСТАВЛЕНО». Таким образом, по сути, изменение типотипирования поля данных в материализованном потоке не удалось до его начала.
Решение (которое не очень элегантно) заключается в следующем:
1 - переместить логику приведения типов или любую логику SQL, которая может привести к ошибке, в функцию триггера
2 - создать секцию «EXCEPTION WHEN THEN» внутри функции триггера
3 - убедитесь, что ВОЗВРАЩАЕТСЯ НОВЫМ; происходит в обоих случаях успешного и неудачного преобразования.
4 - сделать непрерывное преобразование простым проходом без применения логики, просто вызвать триггер. (В этом случае это действительно сводит на нет весь смысл использования PipelineDB в некоторой степени для этой начальной проблемы с размещением данных. Но я отступаю.)
После этого я создал таблицу для регистрации ошибок и, убедившись, что все перечисленные выше три шага выполнены, мы обеспечим успешное выполнение транзакции.
Это важно, потому что, если этого не сделано и «вы получаете исключение в исключении», или вы не обработали исключение изящно, то никакие записи не будут загружены.
Это поддерживает стратегию: мы просто пытаемся сделать обработку данных "форк в реке", чтобы маршрутизировать записи, которые успешно преобразуются в одну таблицу (или потоковую таблицу) в одну сторону, и записи, которые не преобразуются в таблицу ошибок .
Ниже я показываю POC, где мы обрабатываем записи как поток и материализуем их в физическую таблицу. (это может быть и другой поток). Ключи к этому понимают:
В таблице ошибок используются текстовые столбцы
Функция триггера фиксирует ошибки при попытке преобразования и записывает их в таблицу ошибок с базовым описанием ошибки из системы.
Я упоминаю, что мне не «нравится» решение, но это было лучшее, что я смог найти за несколько часов, чтобы обойти ограничение PipelineDB, выполняющего функции триггера после вставки, поэтому сбой при вставке не мог не пойман, а pipelinedb не имеет встроенной возможности для обработки:
- продолжение процесса потока внутри транзакции при сбое
- изящный сбой на уровне строк и упрощенный механизм для маршрутизации неудачных преобразований в таблицу ошибок
DROP SCHEMA IF EXISTS pdb CASCADE;
CREATE SCHEMA IF NOT EXISTS pdb;
DROP TABLE IF EXISTS pdb.lis_final;
CREATE TABLE pdb.lis_final(
edm___row_id bigint,
edm___created_dtz timestamp with time zone DEFAULT current_timestamp,
edm___updatedat_dtz timestamp with time zone DEFAULT current_timestamp,
patient_id text,
encounter_id text,
order_id text,
sample_id text,
container_id text,
result_id text,
orderrequestcode text,
orderrequestname text,
testresultcode text,
testresultname text,
testresultcost text,
testordered_dt timestamp,
samplereceived_dt timestamp,
testperformed_dt timestamp,
testresultsreleased_dt timestamp,
extractedfromsourceat_dt timestamp,
birthdate_d date
);
DROP TABLE IF EXISTS pdb.lis_errors;
CREATE TABLE pdb.lis_errors(
edm___row_id bigint,
edm___errorat_dtz timestamp with time zone default current_timestamp,
edm___errormsg text,
patient_id text,
encounter_id text,
order_id text,
sample_id text,
container_id text,
result_id text,
orderrequestcode text,
orderrequestname text,
testresultcode text,
testresultname text,
testresultcost text,
testordered_dt text,
samplereceived_dt text,
testperformed_dt text,
testresultsreleased_dt text,
extractedfromsourceat_dt text,
birthdate_d text
);
DROP FOREIGN TABLE IF EXISTS pdb.lis_streaming_table CASCADE;
CREATE FOREIGN TABLE pdb.lis_streaming_table (
edm___row_id serial,
patient_id text,
encounter_id text,
order_id text,
sample_id text,
container_id text,
result_id text,
orderrequestcode text,
orderrequestname text,
testresultcode text,
testresultname text,
testresultcost text,
testordered_dt text,
samplereceived_dt text,
testperformed_dt text,
testresultsreleased_dt text,
extractedfromsourceat_dt text,
birthdate_d text
)
SERVER pipelinedb;
CREATE OR REPLACE FUNCTION insert_into_t()
RETURNS trigger AS
$$
BEGIN
INSERT INTO pdb.lis_final
SELECT
NEW.edm___row_id,
current_timestamp as edm___created_dtz,
current_timestamp as edm___updatedat_dtz,
NEW.patient_id,
NEW.encounter_id,
NEW.order_id,
NEW.sample_id,
NEW.container_id,
NEW.result_id,
NEW.orderrequestcode,
NEW.orderrequestname,
NEW.testresultcode,
NEW.testresultname,
NEW.testresultcost,
to_timestamp(NEW.testordered_dt,'YYYY/MM/DD HH24:MI:SS') as testordered_dt,
to_timestamp(NEW.samplereceived_dt,'YYYY/MM/DD HH24:MI:SS') as samplereceived_dt,
to_timestamp(NEW.testperformed_dt,'YYYY/MM/DD HH24:MI:SS') as testperformed_dt,
to_timestamp(NEW.testresultsreleased_dt,'YYYY/MM/DD HH24:MI:SS') as testresultsreleased_dt,
to_timestamp(NEW.extractedfromsourceat_dt,'YYYY/MM/DD HH24:MI:SS') as extractedfromsourceat_dt,
to_date(NEW.birthdate_d,'YYYY/MM/DD') as birthdate_d;
-- Return new as nothing happens
RETURN NEW;
EXCEPTION WHEN others THEN
INSERT INTO pdb.lis_errors
SELECT
NEW.edm___row_id,
current_timestamp as edm___errorat_dtz,
SQLERRM as edm___errormsg,
NEW.patient_id,
NEW.encounter_id,
NEW.order_id,
NEW.sample_id,
NEW.container_id,
NEW.result_id,
NEW.orderrequestcode,
NEW.orderrequestname,
NEW.testresultcode,
NEW.testresultname,
NEW.testresultcost,
NEW.testordered_dt,
NEW.samplereceived_dt,
NEW.testperformed_dt,
NEW.testresultsreleased_dt,
NEW.extractedfromsourceat_dt,
NEW.birthdate_d;
-- Return new back to the streaming view as we don't want that process to error. We already routed the record above to the errors table as text.
RETURN NEW;
END;
$$
LANGUAGE plpgsql;
DROP VIEW IF EXISTS pdb.lis_tryparse CASCADE;
CREATE VIEW pdb.lis_tryparse WITH (action=transform, outputfunc=insert_into_t) AS
SELECT
edm___row_id,
patient_id,
encounter_id,
order_id,
sample_id,
container_id,
result_id,
orderrequestcode,
orderrequestname,
testresultcode,
testresultname,
testresultcost,
testordered_dt,
samplereceived_dt,
testperformed_dt,
testresultsreleased_dt,
extractedfromsourceat_dt,
birthdate_d
FROM pdb.lis_streaming_table as st;