Как вытащить данные из удаленной базы данных в Apache Kafka? - PullRequest
0 голосов
/ 28 августа 2018

Я хочу сделать конвейер данных в реальном времени в Apache Kafka. У меня есть база данных, которая находится в удаленном месте, и эта база данных постоянно обновляется. Кто-нибудь может использовать API Kafka connect, который я должен использовать для извлечения данных из базы данных и загрузки в брокер Kafka в режиме реального времени? позже я бы использовал поток kafka и KSQL для выполнения специальных запросов для выполнения метрик.

Любая помощь будет принята с благодарностью!

Ответы [ 2 ]

0 голосов
/ 30 августа 2018

Если вы хотите создать конвейер данных в реальном времени, вам нужно использовать инструмент изменения данных (CDC), который может передавать изменения из MySQL. Я бы предложил Debezium , которая является распределенной платформой с открытым исходным кодом для сбора данных изменений.

Захватывающие вставки

Когда в таблицу добавляется новая запись, создается JSON, аналогичный приведенному ниже:

{  
   "payload":{  
      "before":null,
      "after":{  
         "id":1005,
         "first_name":"Giorgos",
         "last_name":"Myrianthous",
         "email":"giorgos@abc.com"
      },
      "source":{  
         "name":"dbserver1",
         "server_id":223344,
         "ts_sec":1500369632,
         "gtid":null,
         "file":"mysql-bin.000003",
         "pos":364,
         "row":0,
         "snapshot":null,
         "thread":13,
         "db":"inventory",
         "table":"customers"
      },
      "op":"c",
      "ts_ms":1500369632095
   }
}

before объект равен нулю, а after объект содержит вновь вставленные значения. Обратите внимание, что атрибут op равен c, что указывает на то, что это событие CREATE.

Получение обновлений

При условии, что атрибут email был обновлен, будет создан JSON, аналогичный приведенному ниже:

{ 
    "payload":{  
      "before":{  
         "id":1005,
         "first_name":"Giorgos",
         "last_name":"Myrianthous",
         "email":"giorgos@abc.com"
      },
      "after":{  
         "id":1005,
         "first_name":"Giorgos",
         "last_name":"Myrianthous",
         "email":"newEmail@abc.com"
      },
      "source":{  
         "name":"dbserver1",
         "server_id":223344,
         "ts_sec":1500369929,
         "gtid":null,
         "file":"mysql-bin.000003",
         "pos":673,
         "row":0,
         "snapshot":null,
         "thread":13,
         "db":"inventory",
         "table":"customers"
      },
      "op":"u",
      "ts_ms":1500369929464
   }
}

Обратите внимание op, что теперь u, указывая, что это было событие ОБНОВЛЕНИЕ before объект показывает состояние строки перед обновлением, а after объект фиксирует текущее состояние обновленной строки.

Захват удаляет

Теперь предположим, что строка была удалена;

{ 
    "payload":{  
      "before":{  
         "id":1005,
         "first_name":"Giorgos",
         "last_name":"Myrianthous",
         "email":"newEmail@abc.com"
      },
      "after":null,
      "source":{  
         "name":"dbserver1",
         "server_id":223344,
         "ts_sec":1500370394,
         "gtid":null,
         "file":"mysql-bin.000003",
         "pos":1025,
         "row":0,
         "snapshot":null,
         "thread":13,
         "db":"inventory",
         "table":"customers"
      },
      "op":"d",
      "ts_ms":1500370394589
   }
}

op new равно d, что указывает на событие DELETE. Атрибут after будет нулевым, а объект before содержит строку перед удалением.

Вы также можете ознакомиться с обширным учебником , представленным на их сайте.

РЕДАКТИРОВАТЬ: Пример конфигурации для базы данных MySQL

{
  "name": "inventory-connector",  (1)
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector", (2)
    "database.hostname": "192.168.99.100", (3)
    "database.port": "3306", (4)
    "database.user": "debezium", (5)
    "database.password": "dbz", (6)
    "database.server.id": "184054", (7)
    "database.server.name": "fullfillment", (8)
    "database.whitelist": "inventory", (9)
    "database.history.kafka.bootstrap.servers": "kafka:9092", (10)
    "database.history.kafka.topic": "dbhistory.fullfillment" (11)
    "include.schema.changes": "true" (12)
  }
}

1 Название нашего разъема, когда мы регистрируем его в Kafka Connect. оказание услуг.
2 Имя этого класса коннектора MySQL.
3 Адрес сервер MySQL.
4 Номер порта сервера MySQL.
5 Имя пользователь MySQL, имеющий необходимые привилегии.
6 Пароль для пользователь MySQL, имеющий необходимые привилегии.
7 разъемов идентификатор, который должен быть уникальным в кластере MySQL и похож на Свойство конфигурации MySQL для идентификатора сервера.
8 Логическое название MySQL сервер / кластер, который формирует пространство имен и используется во всех названия тем Кафки, в которые пишет коннектор, Кафка Подключите имена схем и пространства имен соответствующих Avro схема, когда используется Avro Connector.
9 Список всех баз данных размещенный на этом сервере, этот соединитель будет контролировать Это необязательно, и есть другие свойства для перечисления баз данных и таблицы для включения или исключения из мониторинга.
10 Список Кафки брокеры, которые этот разъем будет использовать для записи и восстановления DDL заявления в тему истории базы данных.
11 Название базы данных тема истории, где коннектор напишет и восстановит DDL заявления. Эта тема предназначена только для внутреннего использования и не должна использоваться потребителями.
12 Флаг, указывающий, что разъем должен сгенерировать в теме изменения схемы с именем события полного заполнения с помощью Изменения DDL, которые могут быть использованы потребителями.

0 голосов
/ 30 августа 2018

Если вы читаете из базы данных MySQL, используйте коннектор JDBC-источника Confluent. https://github.com/confluentinc/kafka-connect-jdbc/ Вам также нужно скачать драйвер MYSQL и поставить его вместе с jaf-файлами kafka: https://dev.mysql.com/downloads/connector/j/5.1.html

...