Можно ли определить схему на kafka topi c? - PullRequest
0 голосов
/ 14 апреля 2020

Я отправляю схему данных и значений в kafka topi c следующим образом:

./bin/kafka-avro-console-producer \
  --broker-list 10.0.0.0:9092 --topic orders \
  --property parse.key="true" \
  --property key.schema='{"type":"record","name":"key_schema","fields":[{"name":"id","type":"int"}]}' \
  --property key.separator="$" \
  --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":["null","int"],"default": null},{"name":"product","type": ["null","string"],"default": null}, {"name":"quantity", "type":  ["null","int"],"default": null}, {"name":"price","type":  ["null","int"],"default": null}]}' \
  --property schema.registry.url=http://10.0.0.0:8081

затем я получаю эти данные от kafka для свойств этого приемника:

{
  "name": "jdbc-oracle",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "orders",
    "connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
    "connection.user": "[redact]",
    "connection.password": "[redact]",
    "auto.create": "true",
    "delete.enabled": "true",
    "pk.mode": "record_key",
    "pk.fields": "id",
    "insert.mode": "upsert",
    "name": "jdbc-oracle"
  },
  "tasks": [
    {
      "connector": "jdbc-oracle",
      "task": 0
    }
  ],
  "type": "sink"
}

Но я хочу получить json от Кафки без value.schema. если я поставлю kafka topi c только эти json данные

{"id":9}${"id": {"int":9}, "product": {"string":"Yağız Gülbahar"}, "quantity": {"int":1071}, "price": {"int":61}}

Как я могу получить эти данные от kafka и поставить oracle с сливным jdb c стоком.

Я хочу создать схему на стороне Kafka Connect?

И еще одна вещь: могу ли я получить два разных типа данных из одной kafka topi c, и это будет две разные таблицы на oracle сторона с JDB c раковина.

1 Ответ

1 голос
/ 14 апреля 2020

Если у вас есть исходный файл topi c с данными JSON, для которых не объявлена ​​схема, вы должны добавить эту схему, прежде чем сможете использовать приемник JDB C.

Опции включают в себя:

  1. ksqlDB, как показано здесь: https://www.youtube.com/watch?v=b-3qN_tlYR4&t=981s
  2. Kafka Connect Преобразование одного сообщения возможностей. Ни один SMT не поставляется с Apache Kafka, который делает это, но есть прототипов , которые могли бы сделать это.
  3. Другая обработка потока, например, Kafka Streams

Редактировать :

Я имею в виду, могу ли я определить два разных jdb c стока для разных oracle таблиц из одной kafka topi c

Да, каждый топи c может потребляться несколькими приемниками. Параметр конфигурации table.name.format может использоваться для маршрутизации topi c к различным именам таблиц по мере необходимости.

...