Я развернул Кафку с здесь . Также я добавил в docker-compose.yml
Postgres контейнер следующим образом:
postgres:
image: postgres
hostname: kafka-postgres
container_name: kafka-postgres
depends_on:
- ksql-server
- broker
- schema-registry
- connect
ports:
- 5432:5432
Создал topi c просмотров страниц.
Далее я создал DatagenConnector с настройками и запустил его.
{
"name": "datagen-pageviews",
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"kafka.topic": "pageviews",
"max.interval": "100",
"iterations": "999999999",
"quickstart": "pageviews"
}
Насколько я вижу, коннектор определил схему для topi c:
{
"type": "record",
"name": "pageviews",
"namespace": "ksql",
"fields": [
{
"name": "viewtime",
"type": "long"
},
{
"name": "userid",
"type": "string"
},
{
"name": "pageid",
"type": "string"
}
],
"connect.name": "ksql.pageviews"
}
Мой следующий шаг - создать JdbcSinkConnector, который будет передавать данные из Kafka topi c до Postgres таблицы. Это сработало. Настройки коннектора:
{
"name": "from-kafka-to-pg",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topics": [
"pageviews"
],
"connection.url": "jdbc:postgresql://kafka-postgres:5432/postgres",
"connection.user": "postgres",
"connection.password": "********",
"auto.create": "true",
"auto.evolve": "true"
}
Затем я сам пытаюсь отправить сообщения на этот топи c. Но произошла ошибка с ошибкой:
[2020-02-01 21: 16: 11,750] ОШИБКА Обнаружена ошибка в задаче to-pg-0. Выполнение этапа 'VALUE_CONVERTER' с классом 'io.confluent.connect.avro.AvroConverter', где используемой записью является {topic = 'viewviews', partition = 0, offset = 23834, timestamp = 1580591160374, timestampType = CreateTime}. (org. apache .kafka.connect.runtime.errors.LogReporter) org. apache .kafka.connect.errors.DataException: не удалось десериализовать данные для topi c просмотров страниц в Avro: в io.confluent.connect .avro.AvroConverter.toConnectData (AvroConverter. java: 110) в орг. apache .kafka.connect.runtime.WorkerSinkTask.lambda $ convertAndTransformRecord $ 1 (WorkerSinkTask. java: 487) в орг. kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry (RetryWithToleranceOperator. java: 128) at or. apache .kafka.connect.runtime.errors.RetryWithToleranceOperator.execute (RetryWithToleranceOperator. java: 104) в орг. org. apache .kafka.connect.runtime.WorkerSinkTask.convertMessages (WorkerSinkTask. java: 464) в org. apache .kafka.connect.runtime.WorkerSinkTask.poll (WorkerSinkTask. java : 320) в орг. apache .kafka.connect.runtime.WorkerSinkTask.iteration (WorkerSinkTask. java: 224) в орг. apache .kafka.connect.runtime.WorkerSinkTask.execute (WorkerSinkTask. java : 192) в орг. apache .kafka.connect.runtime.WorkerTask.doRun (WorkerTask. java: 177) в орг. apache .kafka.connect.runtime.WorkerTask.run (WorkerTask. java : 227) в java .util.concurrent.Executors $ RunnableAdapter.call (Executors. java: 511) в java .util.concurrent.FutureTask.run (FutureTask. java: 266) в java .util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor. java: 1149) в java .util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor. java: 624) в java .langTh run (Thread. java: 748) Причина: org. apache .kafka.common.errors.SerializationException: Ошибка десериализации сообщения Avro для идентификатора -1. Причина: org. apache .kafka.common.errors. Исключение SerializationException: неизвестный магический байт!
Так что метод send имеет значение. Вот как я это делаю (Python, confluent-kafka- python):
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.poll(0)
producer.produce(topic, json.dumps({
'viewtime': 123,
'userid': 'user_1',
'pageid': 'page_1'
}).encode('utf8'), on_delivery=kafka_delivery_report)
producer.flush()
Может быть, мне следует предоставить схему с сообщением (AvroProducer)?