Сценарий, который я пытаюсь сделать, выглядит следующим образом:
1- Поток TAILDIR Источник читает файл журнала и добавляет статический перехватчик в начало сообщения.Перехватчик состоит из имени хоста и IP-адреса хоста, необходимого для каждого полученного сообщения журнала.
2 - Fluke Kafka Producer Sink, который берет эти сообщения из файла и помещает их в тему Kafka.
Конфигурация Flume выглядит следующим образом:
tier1.sources=source1
tier1.channels=channel1
tier1.sinks =sink1
tier1.sources.source1.interceptors=i1
tier1.sources.source1.interceptors.i1.type=static
tier1.sources.source1.interceptors.i1.key=HostData
tier1.sources.source1.interceptors.i1.value=###HostName###000.00.0.000###
tier1.sources.source1.type=TAILDIR
tier1.sources.source1.positionFile=/usr/software/flumData/flumeStressAndKafkaFailureTestPos.json
tier1.sources.source1.filegroups=f1
tier1.sources.source1.filegroups.f1=/usr/software/flumData/flumeStressAndKafkaFailureTest.txt
tier1.sources.source1.channels=channel1
tier1.channels.channel1.type=file
tier1.channels.channel1.checkpointDir = /usr/software/flumData/checkpoint
tier1.channels.channel1.dataDirs = /usr/software/flumData/data
tier1.sinks.sink1.channel=channel1
tier1.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.kafka.bootstrap.servers=<Removed For Confidentiality >
tier1.sinks.sink1.kafka.topic=FlumeTokafkaTest
tier1.sinks.sink1.kafka.flumeBatchSize=20
tier1.sinks.sink1.kafka.producer.acks=0
tier1.sinks.sink1.useFlumeEventFormat=true
tier1.sinks.sink1.kafka.producer.linger.ms=1
tier1.sinks.sink1.kafka.producer.client.id=HOSTNAME
tier1.sinks.sink1.kafka.producer.compression.type = snappy
Итак, теперь я тестирую, я запустил Console Kafka Consumer и начал писать в исходном файле, и я получаю сообщение сдобавлен заголовок.
Пример:
Я пишу 'test' в исходном файле и нажимаю Enter, затем сохраняю файл
Flume обнаруживает изменение файла, затем отправляет новыйлиния к производителю Kafka.
Мой потребитель получает следующую строку:
###HostName###000.00.0.000###test
Проблема в том, что иногда перехватчик не работает должным образом.Это как будто Flume отправляет 2 сообщения, одно содержит перехватчик, а другое - содержание сообщения.
Пример:
Я пишу «привет тебе» в исходном файле и нажимаю Enter, затем сохраняю файл
Flume обнаруживает изменение файла, затем отправляет новую строку производителю Kafka.
Мой потребитель получает следующую строку:
###HostName###000.00.0.000###
hi you
И терминал прокручивается доновое содержание сообщения.
Этот случай всегда происходит, когда я набираю «привет тебе» в текстовом файле, и поскольку я читаю из файла журнала, то это не предсказуемо, когда это происходит.
Помощь и поддержка будут высоко ценится ^^
Спасибо