Я работаю над приложением для обработки журналов с помощью Spark, и я подумал использовать Kafka как способ для потоковой передачи данных из файла журнала.По сути, у меня есть один файл журнала (в локальной файловой системе), который постоянно обновляется новыми журналами, и Kafka Connect кажется идеальным решением для получения данных из файла вместе с новыми добавленными строками.
Я запускаю серверы с их конфигурациями по умолчанию с помощью следующих команд:
Сервер Zookeeper:
zookeeper-server-start.sh config/zookeeper.properties
zookeeper.properties
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
Сервер Kafka:
kafka-server-start.sh config/server.properties
server.properties
broker.id=0
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181
[...]
Затем я создал тему 'connect-test':
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic connect-test
И, наконец, я запускаю Kafka Connector:
connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
connect-file-source.properties
name=my-file-connector
connector.class=FileStreamSource
tasks.max=1
file=/data/users/zamara/suivi_prod/app/data/logs.txt
topic=connect-test
Сначала я протестировал разъем, запустив простойПотребитель консоли:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
Все работало отлично, потребитель получал журналы из файла, и когда я добавлял журналы, потребитель продолжал обновлять новые.
(Затем я попробовал Spark как «потребителя», следуя этому руководству: https://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivers и все было в порядке)
После этого я удалил некоторые журналы из журнала fи изменил тему (я удалил тему 'connect-test', создал другую и отредактировал свойства connect-file-source.properties новой темой).
Но теперь коннектор не работаеттак же и больше.При использовании консольного потребителя я получаю только те журналы, которые уже были в файле, и каждая новая строка, которую я добавляю, игнорируется.Возможно, изменение темы (и / или изменение данных из файла журнала) без изменения имени соединителя что-то сломало в Kafka ...
Это то, что Kafka Connect делает с моей темой "новая тема" и соединителем'new-file-connector',:
[2018-05-16 15:06:42,454] INFO Created connector new-file-connector (org.apache.kafka.connect.cli.ConnectStandalone:104)
[2018-05-16 15:06:42,487] INFO Cluster ID: qjm74WJOSomos3pakXb2hA (org.apache.kafka.clients.Metadata:265)
[2018-05-16 15:06:42,522] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: new-topic-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2018-05-16 15:06:52,453] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:06:52,453] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-05-16 15:06:52,458] INFO WorkerSourceTask{id=new-file-connector-0} Finished commitOffsets successfully in 5 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:427)
[2018-05-16 15:07:02,459] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:07:02,459] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-05-16 15:07:12,459] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:07:12,460] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
(он продолжает сбрасывать 0 ожидающих сообщений даже после добавления новых строк в файл)
Поэтому я попытался начать все сначала: я удалилКаталог / tmp / kafka-logs, файл /tmp/connect.offset и использование совершенно нового имени темы, имени соединителя и файла журнала, на всякий случай.Но, тем не менее, коннектор игнорирует новые логи ... Я даже пытался удалить свою кафку, заново извлечь ее из архива и снова запустить весь процесс (на случай, если что-то изменилось в Кафке), но безуспешно.
Я не понимаю, в чем проблема, любая помощь будет признательна!