Соединитель Kafka Connect FileStreamSink удаляет кавычки и заменяет двоеточие на знак равенства для сообщения JSON - PullRequest
0 голосов
/ 28 июня 2019

Резюме

Когда я транслирую это с производителем консоли

{"id":1337,"status":"example_topic_1 success"}

Я получил это от моего потребителя файлового потока

/ данные / example_topic_1.txt

{id=1337, status=example_topic_1 success}

Это большая проблема для меня, потому что исходное сообщение JSON невозможно восстановить, не делая предположений о том, где раньше были кавычки. Как вывести сообщения в файл, сохранив кавычки?

Подробнее

  1. Сначала я запускаю мой коннектор приемника файлов.
    # sh bin/connect-standalone.sh \
    >   config/worker.properties \
    >   config/connect-file-sink-example_topic_1.properties
    
  2. Во-вторых, я запускаю консольный потребитель (также встроенный в Kafka), чтобы у меня было простое визуальное подтверждение того, что сообщения проходят правильно.
    # sh bin/kafka-console-consumer.sh \
    >   --bootstrap-server kafka_broker:9092 \
    >   --topic example_topic_1
    
  3. Наконец, я запускаю производителя консоли для отправки сообщений и ввожу сообщение.

    # sh bin/kafka-console-producer.sh \
    >   --broker-list kafka_broker:9092 \
    >   --topic example_topic_1
    

    От потребителя консоли сообщение появляется корректно с кавычками.

    {"id":1337,"status":"example_topic_1 success"}
    

    Но я получаю это от моего потребителя FileStreamSink:

    / данные / example_topic_1.txt

    {id=1337, status=example_topic_1 success}
    

Моя конфигурация

конфиг / worker.properties

offset.storage.file.filename=/tmp/example.offsets

bootstrap.servers=kafka_broker:9092
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

конфиг / connect-file-sink-example_topic_1.properties

name=file-sink-example_topic_1
connector.class=FileStreamSink
tasks.max=1
file=/data/example_topic_1.txt
topics=example_topic_1

1 Ответ

1 голос
/ 28 июня 2019

Поскольку вы на самом деле не хотите анализировать данные JSON, а просто передавать их как кусок текста, вам необходимо использовать StringConverter:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

В этой статье объясняется больше онюансы конвертеров: https://rmoff.net/2019/05/08/when-a-kafka-connect-converter-is-not-a-converter/. Здесь показан пример того, что вы пытаетесь сделать, хотя вместо производителя / потребителя консоли используется kafkacat.

...