Потоковая передача Реляционная СУБД мгновенная съемка исходных данных через Spark Streaming проста, но нет прямого способа получить конечные изменения, происходящие в БД.
Лучшее решение - go через Debezium SQL Server Connector
Debezium SQL Server Connector может отслеживать и записывать на уровне строк изменения в схемах базы данных SQL сервера.
- Вам потребуется настроить кластер Kafka
- Включить CD C для SQL сервера
SQL Серверный компакт-диск C не предназначен для хранения полной истории изменений базы данных. Таким образом, необходимо, чтобы Debezium установил базовый уровень текущего контента базы данных и направил его в Kafka. Это достигается с помощью процесса, называемого моментальным снимком.
По умолчанию (начальный режим моментального снимка) коннектор при первом запуске выполняет начальный согласованный моментальный снимок базы данных (что означает, что структура и данные в любых таблицах будут записаны как согласно конфигурации фильтра коннектора).
Каждый моментальный снимок состоит из следующих шагов:
Determine the tables to be captured
Obtain a lock on each of the monitored tables to ensure that no structural changes can occur to any of the tables. The level of the lock is determined by snapshot.isolation.mode configuration option.
Read the maximum LSN ("log sequence number") position in the server’s transaction log.
Capture the structure of all relevant tables.
Optionally release the locks obtained in step 2, i.e. the locks are held usually only for a short period of time.
Scan all of the relevant database tables and schemas as valid at the LSN position read in step 3, and generate a READ event for each row and write that event to the appropriate table-specific Kafka topic.
Record the successful completion of the snapshot in the connector offsets.
Чтение таблиц измененных данных
При первом запуске коннектор принимает структурный снимок структуры захваченных таблиц и сохраняет эту информацию в своей внутренней истории базы данных topi c. Затем коннектор идентифицирует таблицу изменений для каждой из исходных таблиц и выполняет основную l oop
For each change table read all changes that were created between last stored maximum LSN and current maximum LSN
Order the read changes incrementally according to commit LSN and change LSN. This ensures that the changes are replayed by Debezium in the same order as were made to the database.
Pass commit and change LSNs as offsets to Kafka Connect.
Store the maximum LSN and repeat the loop.
После перезапуска коннектор возобновит работу со смещения (фиксация и изменение номеров LSN), с которого он оставался. выключен раньше.
Коннектор может определять, включен ли CD C для исходной таблицы из белого списка во время выполнения, и изменять его поведение.
Серверный коннектор SQL записывает события для всех операций вставки, обновления и удаления в одной таблице в один Kafka topi c. Имя тем Kafka всегда имеет вид serverName.schemaName.tableName, где serverName - это логическое имя коннектора, указанное в свойстве конфигурации database.server.name, schemaName - это имя схемы, в которой произошла операция, и tableName - это имя таблицы базы данных, в которой произошла операция.
Например, рассмотрим установку сервера SQL с базой данных инвентаризации, содержащей четыре таблицы: products, products_on_hand, customers, and orders
в схеме dbo
. Если соединителю, контролирующему эту базу данных, было присвоено логическое имя сервера выполнения, то соединитель будет генерировать события по этим четырем темам Kafka:
fulfillment.dbo.products
fulfillment.dbo.products_on_hand
fulfillment.dbo.customers
fulfillment.dbo.orders