Как я могу загрузить большой плоский файл в таблицу базы данных, используя SSIS? - PullRequest
11 голосов
/ 19 мая 2011

Я не уверен, как это работает, поэтому я ищу правильное решение. Я думаю, что SSIS - верный путь, но я никогда не использовал его раньше

Сценарий:

Каждое утро я получаю файл с разделителями табуляции с записями 800K. Мне нужно загрузить его в мою базу данных:

  1. Получить файл с ftp или локально
  2. Сначала мне нужно удалить из базы данных файл, которого нет в новом файле;
    • Как я могу сравнить данные в ЦКЛ
    • Где мне загрузить данные из файла с разделителями табуляции, чтобы сравнить их с файлом? Должен ли я использовать временную таблицу? ItemID - уникальный столбец в таблице.
  3. Во-вторых, мне нужно вставить только новые записи в базу данных.
  4. Конечно, это должно быть автоматизировано.
  5. Это должен быть эффективный способ без перегрева базы данных SQL

Не забывайте, что файл содержит 800K записей.

Пример данных плоского файла:

ID  ItemID  ItemName  ItemType
--  ------  --------  --------
 1  2345    Apple     Fruit
 2  4578    Banana    Fruit

Как я могу подойти к этой проблеме?

Ответы [ 5 ]

19 голосов
/ 28 мая 2011

Да, SSIS может выполнять требования, которые вы указали в вопросе.Следующий пример должен дать вам представление о том, как это можно сделать.Пример использует SQL Server в качестве серверной части.Некоторые из основных тестовых сценариев, выполняемых на пакете, представлены ниже.Извините за длинный ответ.

Пошаговый процесс:

  1. В базе данных SQL Server создайте две таблицы, а именно dbo.ItemInfoи dbo.Staging.Запросы на создание таблиц доступны в разделе Scripts .Структура этих таблиц показана на скриншоте # 1 .ItemInfo будет содержать фактические данные, а таблица Staging будет содержать промежуточные данные для сравнения и обновления фактических записей.Столбец Id в обеих этих таблицах представляет собой автоматически генерируемый столбец уникальных идентификаторов.Столбец IsProcessed в таблице ItemInfo будет использоваться для идентификации и удаления недействительных записей.

  2. Создайте пакет служб SSIS и создайте 5 переменных, как показано на скриншоте # 2 .Я использовал расширение .txt для файлов с разделителями табуляции и, следовательно, значение *.txt в переменной FileExtension .FilePath переменной будет присвоено значение во время выполнения.FolderLocation переменная указывает, где файлы будут расположены.Переменные SQLPostLoad и SQLPreLoad обозначают хранимые процедуры, используемые во время операций до и после загрузки.Сценарии для этих хранимых процедур предоставляются в разделе Сценарии .

  3. Создание подключения OLE DB, указывающего на базу данных SQL Server.Создайте соединение с плоским файлом, как показано на скриншотах # 3 и # 4 . Столбцы подключения к плоскому файлу раздел содержит информацию об уровне столбца.Снимок экрана # 5 показывает предварительный просмотр данных столбцов.

  4. Настройте задачу потока управления, как показано на скриншоте # 6 .Сконфигурируйте задачи Pre Load, Post Load и Loop Files, как показано на скриншотах # 7 - # 10 .Предварительная загрузка усекает промежуточную таблицу и устанавливает для флага IsProcessed значение false для всех строк в таблице ItemInfo.Post Load обновит изменения и удалит строки в базе данных, которые не найдены в файле.Обратитесь к хранимым процедурам, используемым в этих задачах, чтобы понять, что делается в этих Execute SQL задачах.

  5. Дважды щелкните задачу потока данных Load Items и настройте ее, как показано на скриншоте# 11 .Read File - это источник плоских файлов, настроенный для использования соединения с плоскими файлами.Row Count является производным преобразованием столбца, и его конфигурация показана на экране # 12 .Check Exist является преобразованием поиска, и его конфигурации показаны на скриншотах # 13 - # 15 . Lookup No Match Output перенаправляется на Destination Split на левой стороне. Вывод соответствия поиска перенаправляется на Staging Split на левой стороне.Destination Split и Staging Split имеют точно такую ​​же конфигурацию, как показано на скриншоте # 16 .Причиной для 9 различных назначений как для таблицы назначения, так и для промежуточной таблицы является повышение производительности пакета.

  6. Все задачи назначения 0 - 8 настроены для вставки данных в таблицу dbo.ItemInfoкак показано на скриншоте # 17 .Все промежуточные задания 0 - 8 настроены на вставку данных в dbo.Staging, как показано на скриншоте # 18 .

  7. В диспетчере соединений с плоскими файлами установитеСвойство ConnectionString для использования переменной FilePath, как показано на скриншоте # 19 .Это позволит пакету использовать значение, установленное в переменной, при циклическом просмотре каждого файла в папке.

Тестовые сценарии:

Test results may vary from machine to machine. 
In this scenario, file was located locally on the machine. 
Files on network might perform slower. 
This is provided just to give you an idea. 
So, please take these results with grain of salt.

Пакет был выполнен на 64-разрядной машине с одноядерным процессором Xeon 2,5 ГГц и 3,00 ГБ ОЗУ.

Загружен плоский файл с 1 million rows.Пакет исполняется примерно за 2 минуты 47 секунд .Смотрите скриншоты # 20 и # 21 .

Использовал запросы, представленные в разделе Тестовые запросы , чтобы изменить данные для имитации обновления, удаления и создания новых записей во время второго запуска пакета.

Загружен тот же файл, содержащий 1 million rows после выполнения следующих запросов в базе данных.Пакет исполняется примерно за 1 мин 35 с .Смотрите скриншоты # 22 * ​​1140 * и # 23 .Обратите внимание на количество строк, перенаправленных на место назначения и промежуточную таблицу на скриншоте # 22 * ​​1144 *.

Надеюсь, это поможет.

Тестовые запросы: .

--These records will be deleted during next run 
--because item ids won't match with file data.
--(111111 row(s) affected)
UPDATE dbo.ItemInfo SET ItemId = 'DEL_' + ItemId WHERE Id % 9 IN (3)

--These records will be modified to their original item type of 'General'
--because that is the data present in the file.
--(222222 row(s) affected)
UPDATE dbo.ItemInfo SET ItemType = 'Testing' + ItemId WHERE Id % 9 IN (2,6)

--These records will be reloaded into the table from the file.
--(111111 row(s) affected)
DELETE FROM dbo.ItemInfo WHERE Id % 9 IN (5,9)

Столбцы подключения к плоскому файлу .

Name        InputColumnWidth     DataType          OutputColumnWidth
----------  ----------------     ---------------   -----------------
Id          8                    string [DT_STR]   8
ItemId      11                   string [DT_STR]   11
ItemName    21                   string [DT_STR]   21
ItemType    9                    string [DT_STR]   9

Сценарии: (для создания обеих таблици хранимые процедуры) .

CREATE TABLE [dbo].[ItemInfo](
    [Id] [int] IDENTITY(1,1) NOT NULL,
    [ItemId] [varchar](255) NOT NULL,
    [ItemName] [varchar](255) NOT NULL,
    [ItemType] [varchar](255) NOT NULL,
    [IsProcessed] [bit] NULL,
    CONSTRAINT [PK_ItemInfo] PRIMARY KEY CLUSTERED ([Id] ASC),
    CONSTRAINT [UK_ItemInfo_ItemId] UNIQUE NONCLUSTERED ([ItemId] ASC)) ON [PRIMARY]
GO

CREATE TABLE [dbo].[Staging](
    [Id] [int] IDENTITY(1,1) NOT NULL,
    [ItemId] [varchar](255) NOT NULL,
    [ItemName] [varchar](255) NOT NULL,
    [ItemType] [varchar](255) NOT NULL,
 CONSTRAINT [PK_Staging] PRIMARY KEY CLUSTERED ([Id] ASC)) ON [PRIMARY]
GO

CREATE PROCEDURE [dbo].[PostLoad]
AS
BEGIN
    SET NOCOUNT ON;

    UPDATE      ITM
    SET         ITM.ItemName    = STG.ItemName
            ,   ITM.ItemType    = STG.ItemType 
            ,   ITM.IsProcessed = 1
    FROM        dbo.ItemInfo    ITM
    INNER JOIN  dbo.Staging     STG
    ON          ITM.ItemId      = STG.ItemId;

    DELETE FROM dbo.ItemInfo
    WHERE       IsProcessed = 0;
END
GO

CREATE PROCEDURE [dbo].[PreLoad]
AS
BEGIN
    SET NOCOUNT ON;

    TRUNCATE TABLE dbo.Staging;     

    UPDATE  dbo.ItemInfo 
    SET     IsProcessed = 0;
END
GO

Снимок экрана № 1:

1

Снимок экрана № 2:

2

Снимок экрана № 3:

3

Снимок экрана № 4:

4

Снимок экрана № 5:

5

Снимок экрана № 6:

6

Снимок экрана № 7:

7

Снимок экрана № 8:

8

Снимок экрана № 9:

9

Снимок экрана № 10:

10

Снимок экрана № 11:

11

Снимок экрана № 12:

12

Снимок экрана № 13:

13

Снимок экрана № 14:

14

Снимок экрана № 15:

15

Снимок экрана № 16:

16

Снимок экрана № 17:

17

Снимок экрана № 18:

18

Снимок экрана № 19:

19

Снимок экрана № 20:

20

Снимок экрана № 21:

21

Снимок экрана № 22:

22

Снимок экрана № 23:

23

2 голосов
/ 20 мая 2011

Если вы используете агент SQL (или аналогичный планировщик)

Требуется 1/4) У меня есть шаг предшественника для обработки шагов FTP и / или копирования файлов. Я не люблю загромождать свои пакеты манипуляциями с файлами, если я могу этого избежать.

Треб. 2/3) На уровне потока управления дизайн пакета будет выглядеть как задача «Выполнение SQL», подключенная к потоку данных, подключенному к другой задаче «Выполнение SQL». Как указывало @AllenG, вам лучше всего загрузиться в промежуточную таблицу с помощью задачи потока данных. Первая задача «Выполнение SQL» удалит все строки из промежуточной таблицы (TRUNCATE TABLE dbo.DAILY_STAGE)

Примерный дизайн стола выглядит следующим образом. Таблица MICHAEL_BORN - это ваша существующая таблица, а DAILY_STAGE - место, куда будет направлен ваш поток данных.

CREATE TABLE DBO.MICHAEL_BORN
(
    ID int identity(1,1) NOT NULL PRIMARY KEY CLUSTERED
,   ItemID int NOT NULL
,   ItemName varchar(20) NOT NULL
,   ItemType varchar(20) NOT NULL
)
CREATE TABLE dbo.DAILY_STAGE
(
    ItemID int NOT NULL PRIMARY KEY CLUSTERED
,   ItemName varchar(20) NOT NULL
,   ItemType varchar(20) NOT NULL
)

В демонстрационных целях я буду загружать приведенные выше таблицы с примерами данных через TSQL

-- Original data
INSERT INTO
    dbo.MICHAEL_BORN
VALUES
    (2345,'Apple','Fruit')
,   (4578, 'Bannana','Fruit')


-- Daily load runs
-- Adds a new fruit (pear), corrects misspelling of banana, eliminates apple
INSERT INTO
    dbo.DAILY_STAGE
VALUES
    (7721,'Pear','Fruit')
,   (4578, 'Banana','Fruit')

В задаче «Выполнение SQL» будет использоваться оператор MERGE , доступный в выпусках SQL Server 2008+. Обратите внимание, что конечная точка с запятой является частью оператора MERGE. Если его не включить, произойдет ошибка «Оператор MERGE должен быть завершен точкой с запятой (;)».

-- MERGE statement
-- http://technet.microsoft.com/en-us/library/bb510625.aspx
-- Given the above scenario, this script will
-- 1)  Update the matched (4578 bannana/banana) row
-- 2)  Add the new (pear) row
-- 3)  Remove the unmatched (apple) row

MERGE
    dbo.[MICHAEL_BORN] AS T
USING
(
    SELECT
        ItemID
    ,   ItemName
    ,   ItemType
    FROM
        dbo.DAILY_STAGE

) AS S
ON T.ItemID = S.ItemID
WHEN
    MATCHED THEN
    UPDATE
    SET
        T.ItemName = S.ItemName
    ,   T.ItemType = S.ItemType
WHEN
    NOT MATCHED THEN
    INSERT
    (
        ItemID
    ,   ItemName
    ,   ItemType
    )
    VALUES
    (
        ItemID
    ,   ItemName
    ,   ItemType
    )
WHEN
    NOT MATCHED BY SOURCE THEN
    DELETE
    ;

Треб. 5) Эффективность полностью зависит от ваших данных и ширины строк, но это не должно быть ужасно.

-- Performance testing
-- Assumes you have a similar fast row number generator function
-- http://billfellows.blogspot.com/2009/11/fast-number-generator.html

TRUNCATE TABLE dbo.MICHAEL_BORN
TRUNCATE TABLE dbo.DAILY_STAGE

-- load initial rows
-- 20ish seconds
INSERT INTO
    dbo.MICHAEL_BORN
SELECT
    N.number AS ItemID
,   'Spam & eggs ' + CAST(N.number AS varchar(10)) AS ItemName
,   'SPAM' AS ItemType
--, CASE N.number % 2 WHEN 0 THEN N.number + 1000000 ELSE N.number END AS UpTheEvens
FROM
    dbo.GenerateNumbers(1000000) N


-- Load staging table
-- Odds get item type switched out
-- Evens get delete and new ones created
-- 20ish seconds
INSERT INTO
    dbo.DAILY_STAGE
SELECT
    CASE N.number % 2 WHEN 0 THEN N.number + 1000000 ELSE N.number END AS ItemID
,   'Spam & eggs ' + CAST(N.number AS varchar(10)) AS ItemName
,   CASE N.number % 2 WHEN 0 THEN 'SPAM' ELSE 'Not much spam' END AS ItemType
FROM
    dbo.GenerateNumbers(1000000) N


-- Run MERGE statement, 32 seconds 1.5M rows upserted
-- Probably fast enough for you
1 голос
/ 17 мая 2015

Я просто хочу дать свою идею следующему парню, который может пройти мимо этого вопроса.Поэтому я собираюсь предложить свою идею для каждого сценария.
1. Getfile с FTP или локально.
Я бы предложил вам использовать Dropbox, Google Drive или любые другие облачные службы синхронизации файлов по вашему выбору, см. эта ссылка для подробностей.
2. Я бы предложил загрузить все данные плоских файлов в промежуточную таблицу, как вы предложили. Тогда сравнение данных будет легко сделать с помощью MERGE между вашей промежуточной таблицей и целевой таблицей на вашем уникальномстолбец (ID).Вы можете увидеть эту ссылку о том, как использовать скрипт слияния.2-й и 3-й сценарии будут решены, если вы используете сценарий MERGE.
Для последних двух сценариев я предлагаю использовать SQL JOB для автоматического запуска пакета и планирования его в нерабочее время или в нерабочее время, когда сервер не занят.Пожалуйста, взгляните на ссылку для получения подробной информации о том, как запустить пакет с помощью задания агента SQL Server, просто наберите его в своей любимой поисковой системе, и вы найдете тонны блогов, которые показывают, как это делается.

0 голосов
/ 09 мая 2013

Я бы дал Merge шанс. Убедитесь, что у вас есть индексы ItemID для обеих таблиц.

Merge [dbo].[ItemInfo] as target
using
(
    SELECT stg.ItemID, stg.ItemName, stg.ItemType
    FROM [dbo].[ItemInfo_Staging] stg
    LEFT OUTER JOIN [dbo].[ItemInfo] final
        on stg.ItemId = final.ItemId
) as SOURCE
ON SOURCE.ItemID = target.ItemID

WHEN MATCHED THEN
    Update SET
        target.ItemID = SOURCE.ItemID
        , target.ItemName = SOURCE.ItemName
        , target.ItemType = SOURCE.ItemType

WHEN NOT MATCHED BY TARGET THEN
    INSERT (ItemID, ItemName, ItemType )
        VALUES (SOURCE.ItemID, SOURCE.ItemName, SOURCE.ItemType ) 

WHEN NOT MATCHED BY SOURCE THEN
    DELETE
;
0 голосов
/ 19 мая 2011

SSIS Звучит как путь. Я уже видел способ решения вашей проблемы с помощью промежуточной таблицы. Новый документ загружается в промежуточную таблицу, затем сравниваются промежуточные и производственные данные, устаревшие записи архивируются (не удаляются ТОЛЬКО) из производства, обновляются существующие строки с некоторыми изменениями (опять же, исходные данные где-то архивируются) и новые строки вставляются.

Примечание. Ваше определение "устаревшего" должно быть очень, очень точным. Например: должно ли что-то быть заархивировано только потому, что в вашем последнем файле отсутствует соответствующая строка? Должно ли оно оставаться в течение X времени, если оно появится в следующем файле? Эти и другие вопросы следует рассмотреть.

Практически любой стандартный учебник по SSIS должен указывать правильный путь для каждого из этих шагов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...