Получил исключение при вызове readJson MorphlineInterceptor - PullRequest
0 голосов
/ 17 октября 2019

Я просто хочу получить некоторые данные Json из Kafka, а затем выполнить какое-то преобразование с помощью Morphline. Я просто получаю это исключение:

ОШИБКА kafka.KafkaSource: KafkaSource EXCEPTION, {} org.apache.flume.FlumeException:> org.apache.flume.sink.solr.morphline.MorphlineInterceptor $ LocalMorphlineInterc>eptor не должен генерировать вложения, которые не являются байтами [] или InputStream

, это моя конфигурация потока:


# Name the components on this agent
a1.sources = kafka-source
a1.sinks = console
a1.channels = mem-channel

# Describe/configure the source
a1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.kafka-source.batchSize = 100
a1.sources.kafka-source.kafka.bootstrap.servers = 10.20.80.220:9092
a1.sources.kafka-source.kafka.topics = worm_video_info
a1.sources.kafka-source.kafka.consumer.group.id = flume-consumer-2
a1.sources.kafka-source.kafka.consumer.auto.offset.reset=earliest
a1.sources.kafka-source.interceptors = morphlineinterceptor
a1.sources.kafka-source.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.kafka-source.interceptors.morphlineinterceptor.morphlineFile = morphline-test.conf
a1.sources.kafka-source.interceptors.morphlineinterceptor.morphlineId = morphline-test-id


# Describe the sink
a1.sinks.console.type = logger


# Use a channel which buffers events in memory
a1.channels.mem-channel.type = memory
a1.channels.mem-channel.capacity = 1000
a1.channels.mem-channel.transactionCapacity = 100


# Bind the source and sink to the channel
a1.sources.kafka-source.channels = mem-channel
a1.sinks.console.channel = mem-channel

, это моя конфигурация морфлинга:

morphlines : [
    {
        id : morphline-test-id
        importCommands : ["org.kitesdk.**"]

        commands : [
            {
                readJson{
                }
            }

        ]
    }
]

почти только что прочитал. Что-то не так? Пожалуйста, помогите, спасибо!

Я проверил запись, используя этот код

morphlines : [
    {
        id : morphline-test-id
        importCommands : ["org.kitesdk.**"]

        commands : [
            {
                java {
          imports : "import java.nio.charset.StandardCharsets;"
                    code: """
                    logger.debug("++++++++++@11111++++++record: {}", new String((byte[])(record.get("_attachment_body").get(0)),StandardCharsets.UTF_8));
          logger.debug("++++++++++@2222++++++ is byte[]? {}", record.get("_attachment_body").get(0) instanceof byte[] );                                            
                    return child.process(record); // pass record to next command in chain
                            """

                }
            }
            {
                # Parse input attachment and emit a record for each input line              
                readLine {
                    charset : UTF-8
                }
            }

        ]
    }
]

[_attachment_body] действительно байт []

...