извлекать и преобразовывать сообщения кафки c поля для JDB c коннектора приемника - PullRequest
0 голосов
/ 18 апреля 2020

У меня есть kafka topi c, который получает данные из базы данных mysql, используя исходный соединитель Debezium mysql, ниже приведен формат одного из сообщений:

{
    "Message": {
        "schema": {
            "type": "struct",
            "fields": [
              ...
            ],
            "optional": true,
            "name": "mysql-server-1.inventory.somename"
        },
        "payload": {
            "op": "u",
            "ts_ms": 1465491411815,
            "before": {
                "id": 1004,
                "first_name": "Anne",
                "last_name": "Doof",
                "email": "annek@noanswer.org"
            },
            "after": {
                "id": 1004,
                "first_name": "Anne",
                "last_name": "Marry",
                "email": "annek@noanswer.org"
            },
            "source": {
                "db": "inventory",
                "table": "customers",
                ...
                "query": "Update customers set last_name = 'Marry' where id = 1004"
            }
        }
    }
}

Я хочу столбцы pu sh ts_ms, before, after и id (из объекта / строки) в другую базу данных, используя соединитель приемника jdb c со схемой таблицы как (id,before(text),after(text),timestamp), так как новичок в kafka не может понять:

  • как я могу извлечь только эти поля, из сообщения в pu sh и игнорировать другие?

  • как я могу преобразовать до, после того, как поля в строку / Формат сериализации?

  • Как мне извлечь id из объекта? (в случае операции вставки, before будет нулевым, для удаления, after будет нулевым)

Для приведенного выше сообщения таблица назначения приемника должна содержать в конце следующие данные:

id:     1004
before: '{"id":1004,"first_name":"Anne","last_name":"Doof","email":"annek@noanswer.org"}'
after:  '{"id":1004,"first_name":"Anne","last_name":"Marry","email":"annek@noanswer.org"}'
timestamp: 1465491411815

Ответы [ 2 ]

0 голосов
/ 19 апреля 2020

Вы можете создать DTO (Java объект для вашей json полезной нагрузки, которую вы получаете от вашей kafka topi c), использование этого онлайн-конвертера поможет вам конвертировать ваши json в Java объекты , [http://pojo.sodhanalibrary.com/] [1]

Как только вы получите сообщение от kafka topi c, вы можете использовать objectmapper для преобразования этого json и сопоставления его с вашим соответствующим DTO объекты. Как только у вас есть готовый объект. Вы можете использовать этот объект для извлечения полей, которые вы хотите, просто вызвав getId (), getBefore () et c ..,

Вот некоторый справочный код, который поможет вам понять:

    @KafkaListener(topics = "test")
        public void listen(String payload)  {

            logger.info("Message Received from Kafka topic: {}", payload);

            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

            DTOObject dtoObject = objectMapper.readValue(payload,DTOObject.class);

                logger.info("After Convertion: {}", objectMapper.writeValueAsString(dtoObject));

                logger.info("Get Before:{}", dtoObject.getId());



        }
0 голосов
/ 18 апреля 2020

Вы можете использовать цепочку Преобразований Kafka Connect , например, решение .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...