Изменение сбора данных с использованием Spark SQL - PullRequest
0 голосов
/ 18 июня 2019

У меня есть несколько таблиц, которые связаны как A -> Left Join -> B -> Left join -> C. Давайте назовем A как driving table и B & C как "supporting" таблицы. Каждая из этих таблиц имеет столбец last_update_date. Мое требование состоит в том, чтобы идентифицировать записи, которые изменились со времени последней обработки (доступно в качестве параметра) не только в таблице управления, но также и в том случае, если в вспомогательной таблице (таблицах) произойдет изменение какого-либо столбца.

Table A  
------  
empid|salary|last_updt_dt  
123|20000|05/14/2019   

Table B  
-------  
empid|fname|lname|last_updt_date  
123|John|Taylor|05/16/2019  

Table C  
-------  
empid|address|last_updt_dt  
123|Maryland|05/17/2019  

Предположим, = 05/10/2019

Итак, при условии выполнения задания в День 1 (20/05/2019) выходные данные должны быть:

empid|fname|lname|salary|address|last_exec_date  
-----------------------------------------------    
123|John|Taylor|20000|Maryland|05/20/2019 

Теперь давайте предположим, что во второй день (21.05.2009) адрес был изменен с Maryland на California. Итак, во второй день выходная таблица должна выглядеть следующим образом:

empid|fname|lname|salary|address|last_exec_date  
-----------------------------------------------    
123|John|Taylor|20000|Maryland|05/20/2019 
123|John|Taylor|20000|California|05/21/2019 
561|Peter|Anderson|50000|Missouri|05/21/2019

Следует отметить, что во второй день изменение в любой «вспомогательной таблице» (в данном случае в столбце «Адрес» таблицы C) привело к вставке другой записи, которая уже была обработана ранее вчера, но теперь с обновленным значением в адресная колонка. Также обратите внимание, что во второй день другие вставки будут происходить как есть, как обычные вставки для любой другой соответствующей записи (если есть), например. EmpID = 561.

SELECT
A.empid, B.fname, B.lname, A.salary, C.address, current_date() as last_exec_date
from A
left outer join B
on A.empid = B.empid
left outer join B.empid = C.empid
where to_date(A.last_updt_dt, 'yyyyMMdd') > {last_exec_date}
OR to_date(A.last_updt_dt, 'yyyyMMdd') > {last_exec_date}
to_date(A.last_updt_dt, 'yyyyMMdd') > {last_exec_date}

Моя задача состоит в том, чтобы инициировать и распространять любые изменения из любой участвующей вспомогательной таблицы, даже если это изменение относится к записи, которая была обработана и вставлена ​​в целевую таблицу ранее, чтобы новая запись с обновленным значением показывает в целевой таблице. Другими словами, как я могу вызвать запись с изменением из любой другой таблицы поддержки (не драйвера)

...