Вход Kafka (Confluent Platform) для Logstash - неправильная кодировка сообщений - PullRequest
0 голосов
/ 22 мая 2018

У меня есть Confluent Platform (версия 4.1.1).Он настроен на чтение данных из базы данных.Конфигурация для этого:

name = source-mysql-requests
connection.url = jdbc:mysql://localhost:3306/Requests
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
connection.user = ***
connection.password = ***
mode = incrementing
incrementing.column.name = ID
tasks.max = 5
topic.prefix = requests_
poll.interval.ms = 1000
batch.max.rows = 100
table.poll.interval.ms = 1000

У меня также есть Logstash (версия 6.2.4) для чтения соответствующей темы Kafka.Вот его конфигурация:

kafka { 
    bootstrap_servers => "localhost:9092" 
    topics => ["requests_Operation"] 
    add_field => { "[@metadata][flag]" => "operation" }
}
output { 
 if [@metadata][flag] == "operation" { 
    stdout { 
        codec => rubydebug 
    } 
 } 
}

Когда я запускаю «kafka-avro-console-consumer» для теста, я получаю сообщения такого типа:

{"ID":388625154,"ISSUER_ID":"8e427b6b-1176-4d4a-8090-915fedcef870","SERVICE_ID":"mercury-g2b.service:1.4","OPERATION":"prepareOutcomingConsignmentRequest","STATUS":"COMPLETED","RECEIVE_REQUEST_DATE":1525381951000,"PRODUCE_RESULT_DATE":1525381951000}

Но в Logstash Iесть что-то ужасное и нечитаемое:

"\u0000\u0000\u0000\u0000\u0001����\u0002Hfdebfb95-218a-11e2-a69b-b499babae7ea.mercury-g2b.service:1.4DprepareOutcomingConsignmentRequest\u0012COMPLETED���X���X"

Что может пойти не так?

1 Ответ

0 голосов
/ 09 января 2019

Вы можете изменить Kafka Connect, чтобы он не использовал Avro, изменив конфигурации для value.converter и key.converter, например, на использование JSON.

В противном случае вам потребуется Logstash, чтобы знать, как интерпретироватьРеестр схемы закодирует данные Avro и преобразует их в удобочитаемый формат.

В качестве альтернативы, вы могли бы использовать Elasticsearch или консольный приемник Connect и полностью пропустить Logstash, предполагая, что это цель

Вы также можете использовать SMT Connect для замены конфигурации Logstash add_field : operation

...