Я просто хочу получить некоторые данные Json из Kafka, а затем выполнить какое-то преобразование с помощью Morphline. Я просто получаю это исключение:
ОШИБКА kafka.KafkaSource: KafkaSource EXCEPTION, {} org.apache.flume.FlumeException:> org.apache.flume.sink.solr.morphline.MorphlineInterceptor $ LocalMorphlineInterc>eptor не должен генерировать вложения, которые не являются байтами [] или InputStream
, это моя конфигурация потока:
# Name the components on this agent
a1.sources = kafka-source
a1.sinks = console
a1.channels = mem-channel
# Describe/configure the source
a1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.kafka-source.batchSize = 100
a1.sources.kafka-source.kafka.bootstrap.servers = 10.20.80.220:9092
a1.sources.kafka-source.kafka.topics = worm_video_info
a1.sources.kafka-source.kafka.consumer.group.id = flume-consumer-2
a1.sources.kafka-source.kafka.consumer.auto.offset.reset=earliest
a1.sources.kafka-source.interceptors = morphlineinterceptor
a1.sources.kafka-source.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.kafka-source.interceptors.morphlineinterceptor.morphlineFile = morphline-test.conf
a1.sources.kafka-source.interceptors.morphlineinterceptor.morphlineId = morphline-test-id
# Describe the sink
a1.sinks.console.type = logger
# Use a channel which buffers events in memory
a1.channels.mem-channel.type = memory
a1.channels.mem-channel.capacity = 1000
a1.channels.mem-channel.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.kafka-source.channels = mem-channel
a1.sinks.console.channel = mem-channel
, это моя конфигурация морфлинга:
morphlines : [
{
id : morphline-test-id
importCommands : ["org.kitesdk.**"]
commands : [
{
readJson{
}
}
]
}
]
почти только что прочитал. Что-то не так? Пожалуйста, помогите, спасибо!
Я проверил запись, используя этот код
morphlines : [
{
id : morphline-test-id
importCommands : ["org.kitesdk.**"]
commands : [
{
java {
imports : "import java.nio.charset.StandardCharsets;"
code: """
logger.debug("++++++++++@11111++++++record: {}", new String((byte[])(record.get("_attachment_body").get(0)),StandardCharsets.UTF_8));
logger.debug("++++++++++@2222++++++ is byte[]? {}", record.get("_attachment_body").get(0) instanceof byte[] );
return child.process(record); // pass record to next command in chain
"""
}
}
{
# Parse input attachment and emit a record for each input line
readLine {
charset : UTF-8
}
}
]
}
]
[_attachment_body] действительно байт []