Поток TAILDIR Источник к раковине Кафки - выпуск статического перехватчика - PullRequest
0 голосов
/ 13 февраля 2019

Сценарий, который я пытаюсь сделать, выглядит следующим образом:

1- Поток TAILDIR Источник читает файл журнала и добавляет статический перехватчик в начало сообщения.Перехватчик состоит из имени хоста и IP-адреса хоста, необходимого для каждого полученного сообщения журнала.

2 - Fluke Kafka Producer Sink, который берет эти сообщения из файла и помещает их в тему Kafka.

Конфигурация Flume выглядит следующим образом:

tier1.sources=source1
tier1.channels=channel1
tier1.sinks =sink1

tier1.sources.source1.interceptors=i1


tier1.sources.source1.interceptors.i1.type=static
tier1.sources.source1.interceptors.i1.key=HostData
tier1.sources.source1.interceptors.i1.value=###HostName###000.00.0.000###


tier1.sources.source1.type=TAILDIR
tier1.sources.source1.positionFile=/usr/software/flumData/flumeStressAndKafkaFailureTestPos.json
tier1.sources.source1.filegroups=f1
tier1.sources.source1.filegroups.f1=/usr/software/flumData/flumeStressAndKafkaFailureTest.txt
tier1.sources.source1.channels=channel1

tier1.channels.channel1.type=file
tier1.channels.channel1.checkpointDir = /usr/software/flumData/checkpoint
tier1.channels.channel1.dataDirs = /usr/software/flumData/data



tier1.sinks.sink1.channel=channel1
tier1.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.kafka.bootstrap.servers=<Removed For Confidentiality >
tier1.sinks.sink1.kafka.topic=FlumeTokafkaTest
tier1.sinks.sink1.kafka.flumeBatchSize=20
tier1.sinks.sink1.kafka.producer.acks=0
tier1.sinks.sink1.useFlumeEventFormat=true
tier1.sinks.sink1.kafka.producer.linger.ms=1
tier1.sinks.sink1.kafka.producer.client.id=HOSTNAME
tier1.sinks.sink1.kafka.producer.compression.type = snappy

Итак, теперь я тестирую, я запустил Console Kafka Consumer и начал писать в исходном файле, и я получаю сообщение сдобавлен заголовок.

Пример:

Я пишу 'test' в исходном файле и нажимаю Enter, затем сохраняю файл

Flume обнаруживает изменение файла, затем отправляет новыйлиния к производителю Kafka.

Мой потребитель получает следующую строку:

###HostName###000.00.0.000###test

Проблема в том, что иногда перехватчик не работает должным образом.Это как будто Flume отправляет 2 сообщения, одно содержит перехватчик, а другое - содержание сообщения.

Пример:

Я пишу «привет тебе» в исходном файле и нажимаю Enter, затем сохраняю файл

Flume обнаруживает изменение файла, затем отправляет новую строку производителю Kafka.

Мой потребитель получает следующую строку:

###HostName###000.00.0.000###
hi you

И терминал прокручивается доновое содержание сообщения.

Этот случай всегда происходит, когда я набираю «привет тебе» в текстовом файле, и поскольку я читаю из файла журнала, то это не предсказуемо, когда это происходит.

Помощь и поддержка будут высоко ценится ^^

Спасибо

1 Ответ

0 голосов
/ 19 февраля 2019

Так что проблема была от Kafka Consumer.Он получает полное сообщение от flume

Interceptor + some garbage characters + message

и, если один из символов мусора был \ n (LF в системах Linux), он принимает 2 сообщения, а не 1.

I 'Я использую элемент Kafka Consumer в Streamsets, поэтому легко изменить разделитель сообщений.Я сделал это \ r \ n и теперь он работает нормально.

Если вы имеете дело с полным сообщением в виде строки и хотите применить к нему регулярное выражение или хотите записать его в файл, тогда лучшезаменить \ r и \ n пустой строкой.

Полный обзор ответа можно найти здесь:

https://community.cloudera.com/t5/Data-Ingestion-Integration/Flume-TAILDIR-Source-to-Kafka-Sink-Static-Interceptor-Issue/m-p/86388#M3508

...