Разъем Kafka RabbitMQ - можно получить только байтовые массивы - PullRequest
0 голосов
/ 07 января 2020

Я установил соединитель для извлечения из очереди RabbitMQ и вывода sh в топик Кафки c. Соединитель работает, и очередь очищается. Но когда я смотрю на topi c с помощью kafka-console-consumer или kafkacat, каждая запись выглядит как массив байтов - [B@xxxxxxxx.

Все полезные нагрузки сообщения RabbitMQ равны JSON. Что мне нужно сделать, чтобы вернуть JSON из Кафки? Я попытался value.converter=org.apache.kafka.connect.storage.StringConverter, а также с помощью ByteArrayDeserializer с потребителем консоли.

connect-standalone.properties:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/home/robbie/kafka/plugins

RabbitMQSourceConnector.properties:

name=rabbitmq
tasks.max=1
connector.class=io.confluent.connect.rabbitmq.RabbitMQSourceConnector
rabbitmq.prefetch.count=500
rabbitmq.automatic.recovery.enabled=false
rabbitmq.network.recovery.interval.ms=10000
rabbitmq.topology.recovery.enabled=true
rabbitmq.queue=test1
rabbitmq.username=testuser1
rabbitmq.password=xxxxxxxxxxxxxxx
rabbitmq.host=rmqhost
rabbitmq.port=5672
kafka.topic=rabbitmq.test1

1 Ответ

1 голос
/ 08 января 2020

Вам нужно установить

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

Я написал блог об этом только сегодня :)

...