Как правильно усечь промежуточную таблицу в конвейере ETL? - PullRequest
5 голосов
/ 31 января 2020

У нас есть конвейер ETL, который запускается для каждого CSV, загруженного в учетную запись хранения (Azure). Он выполняет некоторые преобразования в CSV и записывает выходные данные в другое место, также как CSV, и вызывает хранимую процедуру в базе данных (SQL Azure), которая принимает (BULK INSERT) этот результирующий CSV в промежуточную таблицу.

Этот конвейер может иметь одновременное выполнение, так как несколько ресурсов могут загружать файлы в хранилище. Следовательно, в промежуточную таблицу довольно часто вставляются данные.

Затем у нас есть запланированное задание SQL (Elasti c Job), которое запускает SP, который перемещает данные из промежуточной таблицы в финальную. Таблица. На этом этапе мы хотели бы усечь / очистить промежуточную таблицу, чтобы мы не вставляли их повторно при следующем выполнении задания.

Проблема в том, что мы не можем быть уверены, что между загрузкой из промежуточная таблица к финальной таблице и команда усечения, в промежуточную таблицу не было записано никаких новых данных, которые можно было бы усечь без предварительной вставки в финальную таблицу.

Существует ли способ блокировки промежуточная таблица, в то время как мы копируем данные в финальную таблицу, чтобы SP (вызванный из конвейера ETL), пытаясь записать в нее, просто подождал, пока не будет снята блокировка? Это достижимо с помощью транзакций или, может быть, с помощью некоторых ручных команд блокировки?

Если нет, как лучше всего справиться с этим?

Ответы [ 5 ]

1 голос
/ 05 февраля 2020

Мне нравится sp_getapplock, и я сам использую этот метод в нескольких местах из-за его гибкости и полного контроля над логикой блокировки c и временем ожидания.

Единственная проблема, которую я вижу, состоит в том, что в вашем случае параллельные процессы не все равны.

У вас есть SP1, который перемещает данные из промежуточной таблицы в основную таблицу. Ваша система никогда не пытается запустить несколько экземпляров этого SP.

Другой SP2, который вставляет данные в промежуточную таблицу , может запускаться несколько раз одновременно, и это нормально.

Легко реализовать блокировку, которая предотвратит любой параллельный запуск любой комбинации SP1 или SP2. Другими словами, легко, если логика блокировки c одинакова для SP1 и SP2, и они рассматриваются как равные. Но тогда вы не можете иметь несколько экземпляров SP2, работающих одновременно.

Не очевидно, как реализовать блокировку, которая предотвратит одновременный запуск SP1 и SP2, одновременно позволяя нескольким экземплярам SP2 работать одновременно.


Существует другой подход, который не пытается предотвратить одновременный запуск SP, но охватывает и предполагает, что возможны одновременные запуски.

Один из способов сделать это - добавить IDENTITY столбец к промежуточной таблице. Или автоматическое заполнение даты и времени, если вы можете гарантировать, что оно уникально и никогда не уменьшается, что может быть сложно. Или rowversion столбец.

Лог c внутри SP2, который вставляет данные в промежуточную таблицу, не изменяется.

Лог c внутри СП1 для перемещения данных из промежуточной таблицы в основную таблицу необходимо использовать эти значения идентификаторов.

Сначала прочитайте текущее максимальное значение идентификатора из промежуточной таблицы и запомните его в переменной, скажем, @MaxID. Все последующие операции SELECT, UPDATE и DELETE из промежуточной таблицы в этом SP1 должны включать фильтр WHERE ID <= @MaxID.

. Это позволит гарантировать, что в случае появления новой строки, добавленной в промежуточную таблицу во время работы SP1, эта строка не будет обработана и будет оставаться в промежуточной таблице до следующего запуска SP1.

Недостаток этого подхода заключается в том, что вы не можете использовать TRUNCATE, вам нужно использовать DELETE с WHERE ID <= @MaxID.


Если вы в порядке с несколькими экземплярами SP2, ожидающими друг друга (и SP1), то вы можете использовать sp_getapplock, как показано ниже. У меня есть этот код в моей хранимой процедуре. Вы должны поместить эту логику c как в SP1, так и в SP2.

Я не буду звонить sp_releaseapplock здесь явно, потому что для владельца блокировки установлено значение Transaction, и механизм освободит блокировка автоматически по окончании транзакции.

Вам не нужно помещать logry retry c в хранимую процедуру, это может быть внутри внешнего кода, который выполняет эти хранимые процедуры. В любом случае ваш код должен быть готов к повторной попытке.

CREATE PROCEDURE SP2  -- or SP1
AS
BEGIN
    SET NOCOUNT ON;
    SET XACT_ABORT ON;

    BEGIN TRANSACTION;
    BEGIN TRY
        -- Maximum number of retries
        DECLARE @VarCount int = 10;

        WHILE (@VarCount > 0)
        BEGIN
            SET @VarCount = @VarCount - 1;

            DECLARE @VarLockResult int;
            EXEC @VarLockResult = sp_getapplock
                @Resource = 'StagingTable_app_lock',
                -- this resource name should be the same in SP1 and SP2
                @LockMode = 'Exclusive',
                @LockOwner = 'Transaction',
                @LockTimeout = 60000,
                -- I'd set this timeout to be about twice the time
                -- you expect SP to run normally
                @DbPrincipal = 'public';

            IF @VarLockResult >= 0
            BEGIN
                -- Acquired the lock

                -- for SP2
                -- INSERT INTO StagingTable ...

                -- for SP1
                -- SELECT FROM StagingTable ...
                -- TRUNCATE StagingTable ...

                -- don't retry any more
                BREAK;
            END ELSE BEGIN
                -- wait for 5 seconds and retry
                WAITFOR DELAY '00:00:05';
            END;
        END;

        COMMIT TRANSACTION;
    END TRY
    BEGIN CATCH
        ROLLBACK TRANSACTION;
        -- log error
    END CATCH;

END

Этот код гарантирует, что в любой момент времени с промежуточной таблицей работает только одна процедура. Там нет параллелизма. Все остальные экземпляры будут ждать.

Очевидно, что если вы попытаетесь получить доступ к промежуточной таблице не через эти SP1 или SP2 (которые сначала пытаются получить блокировку), то такой доступ не будет заблокирован.

1 голос
/ 06 февраля 2020

Я бы предложил решение с двумя одинаковыми промежуточными таблицами. Давайте назовем их StageLoading и StageProcessing.
Процесс загрузки будет состоять из следующих шагов:
1. В начале обе таблицы пусты.
2. Мы загружаем некоторые данные в таблицу StageLoading (я предполагаю, что каждая загрузка является транзакцией ).
3. При запуске задания Elasti c выполняется:
- ПЕРЕКЛЮЧАТЕЛЬ ТАБЛИЦЫ ALTER для перемещения всех данных из StageLoading в StageProcessing. Это сделает StageLoading пустым и готовым к следующим загрузкам. Это операция метаданных, поэтому она занимает миллисекунды и полностью блокируется, поэтому будет выполняться между загрузками.
- загрузить данные из StageProcessing в финальные таблицы.
- усечь таблицу StageProcessing.
4. Теперь мы готовы к следующему заданию Elasti c.

Если мы попытаемся выполнить SWITCH, когда StageProcessing не пуст, ALTER завершится ошибкой, и это будет означать, что последний процесс загрузки завершился неудачей.

0 голосов
/ 08 февраля 2020

Я всегда предпочитаю "идентифицировать" каждый файл, который я получаю. Если вы можете сделать это, вы можете связать записи из данного файла на протяжении всего процесса загрузки. Вы не вызывали необходимость в этом, но jus sayin.

Однако, если у каждого файла есть идентификатор (это должно делать только значение идентификатора int / bigint), вы можете динамически создавать столько таблиц загрузки, сколько Вам нравится таблица загрузки "шаблона".

  1. Когда файл поступит, создайте новую таблицу загрузки, названную с идентификатором файла.
  2. Обработка ваших данных от загрузки до финала table.
  3. удалить таблицу загрузки для обрабатываемого файла.

Это похоже на другое решение об использовании 2 таблиц (load и stage), но даже в этом решении вы по-прежнему ограничены возможностью «загружать» 2 файла (хотя вы все еще применяете только один файл к финальной таблице?)

Последнее, неясно, отсоединено ли ваше «задание Elasti c» от фактического «загрузить» конвейер / обработку или, если он включен. Будучи заданием, я предполагаю, что оно не включено, если задание можно запустить только один экземпляр за раз? Поэтому неясно, почему важно загружать несколько файлов одновременно, если вы можете одновременно перемещать только один из файлов загрузки в финальный. Зачем ру sh загружать файлы в загрузку?

0 голосов
/ 07 февраля 2020

Предполагая, что у вас есть одно исходящее задание

  • Добавьте OutboundProcessing BIT DEFAULT 0 в таблицу
  • В задании SET OutboundProcessing = 1 WHERE OutboundProcessing = 0 (запросить строки )
  • Для ETL включить WHERE OutboundProcessing = 1 в запрос, который получает данные (передать строки)
  • После ETL УДАЛИТЬ ИЗ ТАБЛИЦЫ ГДЕ OutboundProcessing = 1 (удалить строки, которые вы передано)
  • Если ETL завершается неудачно, SET OutboundProcessing = 0, ГДЕ OutboundProcessing = 1
0 голосов
/ 02 февраля 2020

Есть ли способ заблокировать промежуточную таблицу, пока мы копируем данные в финальную таблицу, чтобы SP (вызываемый из конвейера ETL), пытавшийся записать в нее, просто подождал, пока блокировка освобождает? Это достижимо с помощью транзакций или, возможно, некоторых команд ручной блокировки?

Похоже, вы ищете механизм, который шире, чем уровень транзакции. SQL Сервер / Azure SQL БД имеет один и называется блокировка приложения :

sp_getapplock

Places блокировка ресурса приложения.

Блокировки, размещенные на ресурсе, связаны либо с текущей транзакцией, либо с текущим сеансом. Блокировки, связанные с текущей транзакцией, снимаются, когда транзакция фиксируется или откатывается. Блокировки, связанные с сеансом, снимаются при выходе из сеанса. Когда сервер по какой-либо причине отключается, все блокировки снимаются.

Блокировки могут быть явно сняты с помощью sp_releaseapplock. Когда приложение вызывает sp_getapplock несколько раз для одного и того же ресурса блокировки, процедура sp_releaseapplock должна вызываться одинаковое количество раз, чтобы снять блокировку. Когда блокировка открывается владельцем транзакции, эта блокировка снимается, когда транзакция фиксируется или откатывается.

Это в основном означает, что ваш инструмент ETL должен открыть один сеанс для БД, получить блокировку и отпустите, когда закончите. Другие сеансы, прежде чем пытаться что-либо сделать, должны попытаться получить блокировку (они не могут этого сделать, потому что она уже взята), подождать, пока она не освободится, и продолжить работу.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...