Проблема с передачей базы данных в SSIS - PullRequest
2 голосов
/ 03 февраля 2020

Я создал пакет, который переносит данные из старой базы данных в новую. Это выглядит примерно так:

tblSupplier -> Поставщики

tblNaturalKey -> InvoiceNaturalKey

tblInvoice -> Invoices ...

Имена таблиц не являются проблема, поскольку они могут быть жестко запрограммированы. Я хочу автоматизировать это для многих клиентов, и у каждого есть свой собственный естественный ключ для таблицы счетов.

Например, [InvoiceNumber], [AccountNumber] ....

Таким образом, для каждого клиента я создал пакет, в котором есть задача execute SQL, которая получает информацию о столбце из tblNaturalKey, сохраняет ее как объект и затем реализует ADO l oop для чтения объекта и создания каждого настраиваемого столбца в новом Натуральный Ключ стол. Я надеялся, что тогда пакет передачи сможет просто подобрать сопоставления, основанные на равенстве имен столбцов. Это Dynami c, поскольку я могу просто ввести старый и новый сервер и базу данных в качестве переменных среды и запустить задание из каталога сервера. Но столбцы не отображаются, и я получаю ошибки метаданных. Я приложу свой исходный код biml. Пожалуйста помоги! KR Marcus

<Biml xmlns="http://schemas.varigence.com/biml.xsd">
   <Packages>
        <Package Name="0-0-Nat_Key_Columns" ConstraintMode="Linear">
            <Connections>
                <Connection ConnectionName="OldDatabase"></Connection>
            </Connections>
            <Variables>
                <Variable Name="Columns" DataType="Object" Namespace="User"></Variable>
                <Variable Name="ColName" DataType="String" Namespace="User">op</Variable>
                <Variable Name="DataType" DataType="String" Namespace="User">int</Variable>
                <Variable Name="DTdesc" DataType="String" Namespace="User"></Variable>
            </Variables>
            <Tasks>
                <ExecuteSQL Name="ES-GetMissingCols" ConnectionName="OldDatabase" ResultSet="Full">
                    <DirectInput>
                              select i.COLUMN_NAME, i.DATA_TYPE, 
                             case   
                                when i.data_type = 'varchar' then CAST(concat('(',i.CHARACTER_MAXIMUM_LENGTH,')') AS varchar(10))
                                when i.DATA_TYPE = 'int' then ''
                                when i.DATA_TYPE = 'datetime' then ''
                                when i.DATA_TYPE = 'decimal' then cast(concat('(',i.NUMERIC_PRECISION,',',i.NUMERIC_SCALE,')') as varchar(10) )
                                else ''
                            end AS DTdesc 
                            from INFORMATION_SCHEMA.COLUMNS i
                            where i.TABLE_NAME = 'tblsys_naturalkey_lu'  
                    </DirectInput>
                    <Results>
                        <Result VariableName="User.Columns" Name="0"></Result>
                    </Results>
                </ExecuteSQL>
                <ForEachAdoLoop Name="ForEachADO-ColumnLoop" SourceVariableName="User.Columns" EnumerationMode="EnumerateRowsInFirstTable">
                    <VariableMappings>
                        <VariableMapping Name="0" VariableName="User.ColName"></VariableMapping>
                        <VariableMapping Name="1" VariableName="User.DataType"></VariableMapping>
                        <VariableMapping Name="2" VariableName="User.DTdesc"></VariableMapping>
                    </VariableMappings>
                    <Tasks>
                        <ExecuteSQL Name="ES-AddMissingCols" ConnectionName="NewDatabase">
                            <DirectInput></DirectInput>
                            <Expressions>
                                <Expression ExternalProperty="SqlStatementSource">
                                    "IF NOT EXISTS (
                                      SELECT
                                        *
                                      FROM
                                        INFORMATION_SCHEMA.COLUMNS
                                      WHERE
                                        TABLE_NAME = 'InvoiceNaturalKey' AND COLUMN_NAME = '"+ @[User::ColName]+ "')
                                    BEGIN
                                      ALTER TABLE InvoiceNaturalKey
                                        ADD ["+ @[User::ColName] + "] " + @[User::DataType] +  (DT_WSTR,5)@[User::DTdesc]  +" 
                                    END;"
                                </Expression>
                            </Expressions>
                        </ExecuteSQL>
                    </Tasks>
                </ForEachAdoLoop>
            </Tasks>
        </Package>
        <Package Name="1-3-Nat_Key" ConstraintMode="Linear" ProtectionLevel="DontSaveSensitive">
            <Tasks>
                <Dataflow Name="DFT-NatKey Transfer">
                    <Transformations>
                        <OleDbSource Name="ODS-NK" ConnectionName="OldDatabase" ValidateExternalMetadata="false">
                            <DirectInput>
                                SELECT * FROM [dbo].[tblsys_naturalkey_lu]
                            </DirectInput>
                        </OleDbSource>
                        <OleDbDestination Name="ODD-NK" ConnectionName="NewDatabase" KeepIdentity="true">
                            <SqlCommandOutput>select * from InvoiceNaturalKey</SqlCommandOutput>
                        </OleDbDestination>
                    </Transformations>
                </Dataflow>
            </Tasks>
        </Package>
    </Packages>
</Biml>
<#@ template language="C#" tier="2"#>

1 Ответ

1 голос
/ 10 февраля 2020

Я недавно решил эту проблему, используя задачу сценария execute process / Python вместо задачи потока данных. Таким образом, я смог передать параметр проекта в скрипт python и решить проблему с отображением. Я использовал метод executemany () из библиотеки pyodb c. Пожалуйста, смотрите код ниже.

# -*- coding: utf-8 -*-
"""
Created on Mon Feb  3 13:00:48 2020

@author: MTrinder
"""
import sys
import pyodbc

#dummy arguments, sys arguments will replace
args = ['','[natkey1],[natkey2],[natkey3],[natKey4],[natKey5],[natKey6]','old_server','old_db','new_server','new_db']
for i,arg in enumerate(sys.argv):
    args[i] = arg

#map arguments
cols_str = args[1]
old_server = args[2]
old_db = args[3]
new_server = args[4]
new_db = args[5]

#extract data to be inserted
sql_conn = pyodbc.connect("DRIVER={SQL Server Native Client 11.0};SERVER="+old_server+";DATABASE="+old_db+";Trusted_Connection=yes") 
query = """ SELECT * from tblsys_naturalkey_lu"""

import pandas as pd
nk = pd.read_sql_query(query, sql_conn)
#data to strings
nk =  nk.astype(str)

#col string

cols_str_b = cols_str.replace('[','')
cols_str_b = cols_str_b.replace(']','')

cols = cols_str.split(',')
cols_b = cols_str_b.split(',')
#number of nk cols
n_cols = len(cols)
#sql insert values
vals = ''
for i in range(n_cols):
    if i<n_cols-1:
        vals = vals + '?,'
    else:
        vals = vals +'?'

#new databse connection
connStr = pyodbc.connect('DRIVER={SQL Server Native Client 11.0};SERVER='+new_server+';DATABASE='+new_db+';Trusted_Connection=yes')
cursor = connStr.cursor()
#data to list
import numpy as np
data = np.array(nk).tolist()
import math
#create batches
n_batches = 30 #optimal batch size for 10M records
batch_size = math.floor(len(nk) / n_batches)
#insert rows
for i in range(n_batches+1):
    if i != n_batches:
        sql = "set identity_insert [InvoiceNaturalKey] on INSERT INTO dbo.InvoiceNaturalKey("+ cols_str +") values ("+vals+") set identity_insert [InvoiceNaturalKey] off"
        cursor.fast_executemany = True
        cursor.executemany(sql, data[i*batch_size:((i+1)*batch_size)])  
    else:
        cursor.executemany(sql, data[n_batches*batch_size:])
    connStr.commit()
cursor.close()
connStr.close()









Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...