Kafka Connect FileStreamSource игнорирует добавленные строки - PullRequest
0 голосов
/ 16 мая 2018

Я работаю над приложением для обработки журналов с помощью Spark, и я подумал использовать Kafka как способ для потоковой передачи данных из файла журнала.По сути, у меня есть один файл журнала (в локальной файловой системе), который постоянно обновляется новыми журналами, и Kafka Connect кажется идеальным решением для получения данных из файла вместе с новыми добавленными строками.

Я запускаю серверы с их конфигурациями по умолчанию с помощью следующих команд:

Сервер Zookeeper:

zookeeper-server-start.sh config/zookeeper.properties

zookeeper.properties

dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0

Сервер Kafka:

kafka-server-start.sh config/server.properties

server.properties

broker.id=0
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181
[...]

Затем я создал тему 'connect-test':

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic connect-test

И, наконец, я запускаю Kafka Connector:

connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties

connect-standalone.properties

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

connect-file-source.properties

name=my-file-connector
connector.class=FileStreamSource
tasks.max=1
file=/data/users/zamara/suivi_prod/app/data/logs.txt
topic=connect-test

Сначала я протестировал разъем, запустив простойПотребитель консоли:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

Все работало отлично, потребитель получал журналы из файла, и когда я добавлял журналы, потребитель продолжал обновлять новые.

(Затем я попробовал Spark как «потребителя», следуя этому руководству: https://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivers и все было в порядке)

После этого я удалил некоторые журналы из журнала fи изменил тему (я удалил тему 'connect-test', создал другую и отредактировал свойства connect-file-source.properties новой темой).

Но теперь коннектор не работаеттак же и больше.При использовании консольного потребителя я получаю только те журналы, которые уже были в файле, и каждая новая строка, которую я добавляю, игнорируется.Возможно, изменение темы (и / или изменение данных из файла журнала) без изменения имени соединителя что-то сломало в Kafka ...

Это то, что Kafka Connect делает с моей темой "новая тема" и соединителем'new-file-connector',:

[2018-05-16 15:06:42,454] INFO Created connector new-file-connector (org.apache.kafka.connect.cli.ConnectStandalone:104)
[2018-05-16 15:06:42,487] INFO Cluster ID: qjm74WJOSomos3pakXb2hA (org.apache.kafka.clients.Metadata:265)
[2018-05-16 15:06:42,522] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: new-topic-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2018-05-16 15:06:52,453] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:06:52,453] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-05-16 15:06:52,458] INFO WorkerSourceTask{id=new-file-connector-0} Finished commitOffsets successfully in 5 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:427)
[2018-05-16 15:07:02,459] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:07:02,459] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-05-16 15:07:12,459] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:07:12,460] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)

(он продолжает сбрасывать 0 ожидающих сообщений даже после добавления новых строк в файл)

Поэтому я попытался начать все сначала: я удалилКаталог / tmp / kafka-logs, файл /tmp/connect.offset и использование совершенно нового имени темы, имени соединителя и файла журнала, на всякий случай.Но, тем не менее, коннектор игнорирует новые логи ... Я даже пытался удалить свою кафку, заново извлечь ее из архива и снова запустить весь процесс (на случай, если что-то изменилось в Кафке), но безуспешно.

Я не понимаю, в чем проблема, любая помощь будет признательна!

Ответы [ 2 ]

0 голосов
/ 17 мая 2018

Kafka Connect не "смотрит" и не "следит" за файлом.Я не верю, что где-то задокументировано, что он делает , что делает это.


Я бы сказал, что это даже менее полезно для чтения активных журналов, чем использование Spark Streaming для просмотра папки .Spark «распознает» вновь созданные файлы.Kafka Connect FileStreamSource должен указывать на один существующий неизменяемый файл.

Чтобы заставить Spark работать с активными журналами, вам понадобится что-то, что делает "ротацию журналов" - то есть, когда файл достигает максимального размера или условие, такое как конецпериод времени (скажем, день), затем этот процесс переместит активный журнал в каталог, который наблюдает Spark, затем он обрабатывает запуск нового файла журнала для вашего приложения, чтобы продолжить запись в него.


Если вы хотите, чтобы файлы активно просматривались и передавались в Kafka, тогда можно использовать Filebeat, Fluentd или Apache Flume.

0 голосов
/ 16 мая 2018

Per docs :

Примеры коннектора FileStream предназначены для того, чтобы показать, как работает простой коннектор для тех, кто только начинает работать с Kafka Connect как пользователь или разработчик.Это не рекомендуется для производственного использования.

Я бы использовал что-то вроде Filebeat (с его выводом Kafka) вместо того, чтобы глотать логи в Kafka.Или kafka-connect-spooldir , если ваши журналы не добавляются напрямую, а являются автономными файлами, помещенными в папку для загрузки.

...