Динамическое изменение имен таблиц для источника и назначения в потоке мула для загрузки данных из SQL серверной базы данных в снежинку - PullRequest
1 голос
/ 23 января 2020

У меня есть требование для извлечения и загрузки (E & L) данных из источника SQL Сервер MS Dynamics в Snowflake. Всего более 200 таблиц. Здесь нет преобразования, и это только однозначное сопоставление и выгрузка данных в промежуточные таблицы Snowflake, такие как TRUNCATE и load.

Я новичок в Mule и боялся, что мне нужно создать 200 потоков для E & L. После небольшого исследования и с моим предыдущим опытом в SSIS я искал создать только 1 поток в mule, где я могу динамически изменять запрос исходной таблицы и запрос целевой таблицы. После проведенного дня смог сделать это. Вот полный код, который может помочь другим людям.

Для динамического изменения исходного и целевого запросов я создал таблицу с именем _Config и сохранил исходные и целевые запросы для каждой таблицы и извлекал их в потоке и выполнял в пакетном режиме. Этот подход сэкономит много моего времени, и мне не нужно создавать 200 потоков. также в будущем, если мне нужно будет добавить больше таблиц как часть этого процесса, мне просто нужно добавить их в таблицу конфигурации.

Также я работаю над тем, чтобы захватить журнал загрузки данных для каждой загрузки таблицы и вставить в таблицу. Мне нужна помощь, поэтому разместил вопрос здесь: Хотите записать журнал загрузки данных в таблицу SQL или Snowflake в Mule Flow

В дополнение к выше, хотел бы захватить журнал ошибок и Хранить в столе. Хотите записать журнал ошибок для потока мулов и ищите следующую информацию

CREATE TABLE [dbo].[Shivendoo_Config](
	[Id] [int] IDENTITY(1,1) NOT NULL,
	[GroupId] [int] NOT NULL,
	[GroupName] [varchar](400) NOT NULL,
	[SourceTable] [varchar](400) NOT NULL,
	[SourceQuery] [varchar](max) NOT NULL,
	[DestinationTable] [varchar](400) NOT NULL,
	[DestinationQuery] [varchar](max) NOT NULL,
	[IsIncluded] [bit] NOT NULL,
	[IsManualRun] [bit] NOT NULL,
	[FullLoadFlag] [bit] NOT NULL,
	[IsTruncate] [bit] NOT NULL,
	[TruncateQuery] [varchar](max) NULL,
	[DeltaLoadFlag] [bit] NOT NULL,
	[IsDelete] [bit] NOT NULL,
	[DeleteQuery] [varchar](max) NULL,
	[DeltaColumn] [varchar](400) NULL,
	[DeltaValueFrom] [varchar](400) NULL,
	[DeltaValueTo] [varchar](400) NULL,
	[Comments] [varchar](400) NULL,
	[CreatedDateTime] [datetime] NOT NULL,
	[ModifiedDateTime] [datetime] NOT NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
GO

ALTER TABLE [dbo].[Shivendoo_Config] ADD  DEFAULT ((0)) FOR [IsManualRun]

USE [DB]
GO
SET IDENTITY_INSERT [dbo].[Shivendoo_Config] ON 
GO
INSERT [dbo].[Shivendoo_Config] ([Id], [GroupId], [GroupName], [SourceTable], [SourceQuery], [DestinationTable], [DestinationQuery], [IsIncluded], [IsManualRun], [FullLoadFlag], [IsTruncate], [TruncateQuery], [DeltaLoadFlag], [IsDelete], [DeleteQuery], [DeltaColumn], [DeltaValueFrom], [DeltaValueTo], [Comments], [CreatedDateTime], [ModifiedDateTime]) VALUES (1, 1, N'Full Load-1', N'dbo.ShivendooSource1', N'SELECT *, Getdate() AS Current_DateTime FROM [dbo].[ShivendooSource1]', N'STAGING.ShivendooDestination1', N'INSERT INTO STAGING.ShivendooDestination1 (FirstName, ETL_LoadDate) VALUES (:FirstName, :Current_DateTime)', 1, 0, 1, 1, N'TRUNCATE TABLE STAGING.ShivendooDestination1', 0, 0, NULL, NULL, NULL, NULL, N'This is Full Load Group-1', CAST(N'2020-01-22T14:04:11.387' AS DateTime), CAST(N'2020-01-22T14:04:11.387' AS DateTime))
GO
INSERT [dbo].[Shivendoo_Config] ([Id], [GroupId], [GroupName], [SourceTable], [SourceQuery], [DestinationTable], [DestinationQuery], [IsIncluded], [IsManualRun], [FullLoadFlag], [IsTruncate], [TruncateQuery], [DeltaLoadFlag], [IsDelete], [DeleteQuery], [DeltaColumn], [DeltaValueFrom], [DeltaValueTo], [Comments], [CreatedDateTime], [ModifiedDateTime]) VALUES (2, 1, N'Full Load-1', N'dbo.ShivendooSource2', N'SELECT *, Getdate() AS Current_DateTime FROM [dbo].[ShivendooSource2]', N'STAGING.ShivendooDestination2', N'INSERT INTO STAGING.ShivendooDestination2 (LastName, ETL_LoadDate) VALUES (:LastName, :Current_DateTime)', 1, 0, 1, 1, N'TRUNCATE TABLE STAGING.ShivendooDestination2', 0, 0, NULL, NULL, NULL, NULL, N'This is Full Load Group-1', CAST(N'2020-01-22T14:04:11.387' AS DateTime), CAST(N'2020-01-22T14:04:11.387' AS DateTime))
GO
INSERT [dbo].[Shivendoo_Config] ([Id], [GroupId], [GroupName], [SourceTable], [SourceQuery], [DestinationTable], [DestinationQuery], [IsIncluded], [IsManualRun], [FullLoadFlag], [IsTruncate], [TruncateQuery], [DeltaLoadFlag], [IsDelete], [DeleteQuery], [DeltaColumn], [DeltaValueFrom], [DeltaValueTo], [Comments], [CreatedDateTime], [ModifiedDateTime]) VALUES (3, 2, N'Delta Load-1', N'dbo.ShivendooSource3', N'SELECT * FROM [dbo].[ShivendooSource3]', N'STAGING.ShivendooDestination3', N'INSERT INTO STAGING.ShivendooDestination3 (FullName, ModifiedDateTime, ETL_LoadDate) VALUES (:FullName, :ModifiedDateTime, :ETL_LoadDate)', 1, 0, 0, 0, NULL, 1, 1, N'DELETE FROM STAGING.ShivendooDestination3 WHERE ModifiedDateTime BETWEEN ''2019-12-01'' AND ''2019-12-31''', N'ModifiedDateTime', N'2019-12-01', N'2019-12-31', N'Delta Load Group-1', CAST(N'2020-01-22T14:14:03.643' AS DateTime), CAST(N'2020-01-22T14:14:03.643' AS DateTime))
GO
INSERT [dbo].[Shivendoo_Config] ([Id], [GroupId], [GroupName], [SourceTable], [SourceQuery], [DestinationTable], [DestinationQuery], [IsIncluded], [IsManualRun], [FullLoadFlag], [IsTruncate], [TruncateQuery], [DeltaLoadFlag], [IsDelete], [DeleteQuery], [DeltaColumn], [DeltaValueFrom], [DeltaValueTo], [Comments], [CreatedDateTime], [ModifiedDateTime]) VALUES (4, 2, N'Delta Load-1', N'dbo.ShivendooSource4', N'SELECT * FROM [dbo].[ShivendooSource4]', N'STAGING.ShivendooDestination4', N'INSERT INTO STAGING.ShivendooDestination4 (Name, ModifiedDateTime, ETL_LoadDate) VALUES (:Name, :ModifiedDateTime, :ETL_LoadDate)', 1, 0, 0, 0, NULL, 1, 1, N'DELETE FROM STAGING.ShivendooDestination4 WHERE ModifiedDateTime BETWEEN ''2020-01-01'' AND ''2020-01-22''', N'ModifiedDateTime', N'2020-01-01', N'2020-01-22', N'Delta Load Group-1', CAST(N'2020-01-22T14:15:03.100' AS DateTime), CAST(N'2020-01-22T14:15:03.100' AS DateTime))
GO
SET IDENTITY_INSERT [dbo].[Shivendoo_Config] OFF
GO

CREATE TABLE [dbo].[ShivendooSource1](
	[Id] [int] IDENTITY(1,1) NOT NULL,
	[FirstName] [varchar](50) NULL,
	[ETL_LoadDate] [datetime] NOT NULL
) ON [PRIMARY]

CREATE TABLE [STAGING].[ShivendooDestination1](
	[Id] [int] IDENTITY(1,1) NOT NULL,
	[FirstName] [varchar](50) NULL,
	[ETL_LoadDate] [datetime] NOT NULL
) ON [PRIMARY]

Поток: enter image description here

Код:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:batch="http://www.mulesoft.org/schema/mule/batch" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
	xmlns:db="http://www.mulesoft.org/schema/mule/db"
	xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/batch http://www.mulesoft.org/schema/mule/batch/current/mule-batch.xsd">
	<flow name="From_SQL_Table_To_Snowflake_Table_FullLoad_Flow" doc:id="d1a52bb0-1014-4d18-8528-7077b9807c7f" >
		<scheduler doc:name="Scheduler - Runs Every 10 Seconds" doc:id="5ac8b208-57ec-4b58-aa7c-c411bde3c1ce" >
			<scheduling-strategy >
				<fixed-frequency frequency="10" timeUnit="SECONDS"/>
			</scheduling-strategy>
		</scheduler>
		<db:select doc:name="Select Flow Configuration" doc:id="cd8588b6-0458-430e-9779-3bba33f02065" config-ref="Database_Config-1" queryTimeout="10" queryTimeoutUnit="MINUTES">
			<db:sql >SELECT [Id], 
       [GroupId], 
       [GroupName], 
       [SourceTable], 
       [SourceQuery], 
       [DestinationTable], 
       [DestinationQuery], 
       [IsIncluded], 
       [IsManualRun], 
       [FullLoadFlag], 
       [IsTruncate], 
       [TruncateQuery], 
       [DeltaLoadFlag], 
       [IsDelete], 
       [DeleteQuery], 
       [DeltaColumn], 
       [DeltaValueFrom], 
       [DeltaValueTo], 
       [Comments], 
       [CreatedDateTime], 
       [ModifiedDateTime]
FROM [dbo].[Shivendoo_Config]
WHERE [GroupName] = 'Full Load-1'
      AND [IsIncluded] = 1
	  AND [FullLoadFlag] = 1
	  AND [IsTruncate] = 1</db:sql>
		</db:select>
		<logger level="INFO" doc:name="Logger to Get Size Of Payload" doc:id="60cbd4bc-f077-4854-9156-d5726561eb0e" message="Size Of Payload: #[sizeOf(payload)]" />
		<batch:job jobName="Master_Flow_To_Load_Data_From_SQL_Tables_To_Snowflake_TablesBatch_Job" doc:id="7381a2dd-d194-478a-bb22-31c244a3fedb" >
			<batch:process-records >
				<batch:step name="Batch_Step" doc:id="25105496-4d87-48f6-b86b-ec8635ff01fe" >
					<set-variable value="#[payload.SourceQuery]" doc:name="Set Variable to Get SourceQuery" doc:id="87ff6d56-663b-4783-a76c-a9c7f88857e4" variableName="VSourceQuery"/>
					<set-variable value="#[payload.DestinationQuery]" doc:name="Set Variable to Get DestinationQuery" doc:id="f335c1f3-9a2b-4544-8ad7-7ccb11a20977" variableName="VDestinationQuery"/>
					<set-variable value="#[payload.TruncateQuery]" doc:name="Set Variable to Get TruncateQuery" doc:id="9b359b5e-8280-4e26-88ee-8e7b68215782" variableName="VTruncateQuery"/>
					<logger level="INFO" doc:name="Logger to Get SourceQuery and DestinationQuery From Batch" doc:id="13ceec10-acc6-4308-b135-833b21c34e40" message="Source Table and DestinationQuery From Batch: #[vars.VSourceQuery] #[vars.VDestinationQuery]" />
					<db:execute-script doc:name="Execute script To Truncate Destination Table" doc:id="4fa31672-f5b6-4432-a490-128950c970f8" config-ref="Database_Config-1">
						<db:sql >#['$(vars.VTruncateQuery)'] </db:sql>
					</db:execute-script>
					<db:select doc:name="Select From Table Dynamic" doc:id="187ac9b7-9b94-4c97-b893-7113ab903104" config-ref="Database_Config-1">
				<db:sql>#['$(vars.VSourceQuery)'] </db:sql>
			
</db:select>
					<logger level="INFO" doc:name="Logger to Get Size Of Payload" doc:id="a7b61a28-192d-4c7c-bad2-dcf04f791b00" message="Record Counts: #[payload.successfulRecords]" />
					<db:bulk-insert doc:name="Bulk insert to Dynamic Tables" doc:id="7a7419d8-f1df-41b0-b705-7fc9a6f92fce" config-ref="Database_Config-1">
				<db:sql>#[' $(vars.VDestinationQuery)']
</db:sql>
			</db:bulk-insert>
				</batch:step>
			</batch:process-records>
		</batch:job>
	
</flow>

</mule>
...