Как заставить Kafka Sink Connector работать с сериализованным ключом Avro и значением для postgres - PullRequest
0 голосов
/ 27 марта 2019

У меня есть тема Kafka, содержащая сообщения с сериализованным ключом Avro и сериализованным значением Avro.

Я пытаюсь настроить соединитель приемника для помещения этих значений в таблицу базы данных postgres (в данном случае AWS RDS).

Я пробовал несколько вариантов по теме, сообщениям и конфигурации раковин, но, глядя на следующий пример, , если кто-то может дать рекомендации о том, где я иду не так, это было бы здорово! :)

Моя тема имеет следующую схему (в реестре схем) ...

Схема ключей

{
    "type": "record",
    "name": "TestTopicKey",
    "namespace": "test.messaging.avro",
    "doc": "Test key schema.",
    "fields": [
        {
            "name": "unitId",
            "type": "int"
        }
    ]
}

Схема значений

{
    "type": "record",
    "name": "TestTopicValues",
    "namespace": "test.messaging.avro",
    "doc": "Test value schema.",
    "fields": [
        {
            "name": "unitPrice",
            "type": "int",
            "doc": "Price in AUD excluding GST."
        },
        {
            "name": "unitDescription",
            "type": "string"
        }
    ]
}

Я вручную создаю записи по теме, используя «kafka-avro-console -roduction», следующим образом:

/bin/kafka-avro-console-producer --broker-list kafka-box-one:9092 --topic test.units --property parse.key=true --property "key.separator=|" --property "schema.registry.url=http://kafka-box-one:8081" --property key.schema='{"type":"record","name":"TestTopicKey","namespace":"test.messaging.avro","doc":"Test key schema.","fields":[{"name":"unitId","type":"int"}]}' --property value.schema='{"type":"record","name":"TestTopicValues","namespace":"test.messaging.avro","doc":"Test value schema.","fields":[{"name":"unitPrice","type":"int","doc":"Price in AUD excluding GST."},{"name":"unitDescription","type":"string"}]}'

Как только этот продюсер запустится, я смогу успешно добавить записи в тему следующим образом:

{"unitId":111}|{"unitPrice":15600,"unitDescription":"A large widget thingy."}

Примечание: я также могу успешно использовать kafka-avro-console-consumer, как и ожидалось.

Таблица postgres, в которую я пытаюсь войти, выглядит следующим образом:

CREATE TABLE test_area.unit_prices (
    unitId int4 NOT NULL,
    unitPrice int4 NULL,
    unitDescription text NULL,
    CONSTRAINT unit_prices_unitid_pk PRIMARY KEY (unitId)
);

Мой разъем для мойки выглядит так:

{
  "name": "test.area.unit.prices.v01",
  "config": {
      "connector.class": "JdbcSinkConnector",
      "topics": "test.units",
      "group.id": "test.area.unit.prices.v01",
      "key.converter": "io.confluent.connect.avro.AvroConverter",
      "key.converter.schema.registry.url": "http://kafka-box-one:8081",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://kafka-box-one:8081",
      "connection.user": "KafkaSinkUser",
      "connection.password": "KafkaSinkPassword",
      "connection.url": "jdbc:postgresql://unit-catalogue.abcdefghij.my-region-1.rds.amazonaws.com:5432/unit_sales?currentSchema=test_area",
      "table.name.format": "unit_prices",
      "auto.create": false,
      "auto.evole": "false"
  }
}

Я ожидаю, что записи появятся в таблице postgres вскоре после того, как Sink будет показан как работающий. Однако ничто не тонет.

Дополнительные примечания:

  • Я могу подключиться и записать в экземпляр postgres RDS из коробки Kafka Connect, на которой этот соединитель Sink публикуется с учетными данными в соответствии с соединителем Sink, используя usql.
  • Состояние разъема приемника «работает», что говорит мне об отсутствии ошибок в синтаксисе Sink.
...