«Как загрузить строки из последней закрытой позиции, хранящейся в position_file.json, при использовании источника TAILDIR в трубе» - PullRequest
0 голосов
/ 10 января 2019

Я использую flume для загрузки потоковых данных из файла журнала с использованием источника TAILDIR.

Я уже использовал myagent.sources.source_name.positionFile = /path/to/position/file

Файл также создается и каждый раз имеет последнюю позицию чтения, но когда дело доходит до чтения, источник не читает с этой позиции.

Журналы, которые печатаются, имеют закрытый и открывающий оператор вместе с позицией, но при добавлении новых данных они читают из ранее закрытой позиции, а не из недавно закрытой.

Пожалуйста, найдите конфигурацию и журналы, напечатанные на экране вывода.

Конфигурация:

myagent.sources = a1
myagent.sinks = kafka-sink
myagent.channels = memoryChannel
myagent.sources.a1.channels = memoryChannel
myagent.sinks.kafka-sink.channel = memoryChannel
myagent.sources.a1.type = TAILDIR
myagent.sources.a1.filegroups = f1 
myagent.sources.a1.filegroups.f1 = /home/Documents/ecomMR/input.log
myagent.sources.a1.headers.f1.headerKey1 = value1
myagent.sources.a1.positionFile = /home/Documents/tail_position.json
myagent.sources.a1.maxBackoffSleep = 0
myagent.sources.a1.idleTimeout= 0
myagent.sources.a1.batchSize =10
myagent.sources.a1.writePosInterval= 1
myagent.sources.a1.maxBackoffSleep= 0
myagent.sources.a1.skipToEnd= true
myagent.sources.a1.backoffSleepIncrement= 0
myagent.sources.a1.fileHeader= true
myagent.sources.a1.byteOffsetHeader= true
myagent.channels.memoryChannel.type =memory
myagent.channels.memoryChannel.capacity = 200000
myagent.channels.memoryChannel.transactionCapacity = 1000
myagent.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
myagent.sinks.kafka-sink.brokerList = localhost:2181
myagent.sinks.kafka-sink.kafka.topic = kafka-flume
myagent.sinks.kafka-sink.kafka.bootstrap.servers = localhost:9092
myagent.sinks.kafka-sink.kafka.flumeBatchSize = 100
myagent.sinks.kafka-sink.kafka.producer.acks = 1
myagent.sinks.kafka-sink.kafka.producer.linger.ms = 1
myagent.sinks.kafka-sink.kafka.producer.compression.type = snappy

Логи:

2019-01-10 12:40:45,618 (PollableSourceRunner-TaildirSource-a1) [INFO - org.apache.flume.source.taildir.TaildirSource.closeTailFiles(TaildirSource.java:288)] Closed file: /home/Documents/ecomMR/input.log, inode: 8524528, pos: 18

2019-01-10 12:40:58,633 (PollableSourceRunner-TaildirSource-a1) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile(ReliableTaildirEventReader.java:283)] Opening file: /home/Documents/ecomMR/input.log, inode: 8524533, pos: 0

2019-01-10 12:41:00,638 (PollableSourceRunner-TaildirSource-a1) [INFO - org.apache.flume.source.taildir.TaildirSource.closeTailFiles(TaildirSource.java:288)] Closed file: /home/Documents/ecomMR/input.log, inode: 8524533, pos: 24

2019-01-10 12:41:03,640 (PollableSourceRunner-TaildirSource-a1) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile(ReliableTaildirEventReader.java:283)] Opening file: /home/Documents/ecomMR/input.log, inode: 8524528, pos: 18

2019-01-10 12:41:05,642 (PollableSourceRunner-TaildirSource-a1) [INFO - org.apache.flume.source.taildir.TaildirSource.closeTailFiles(TaildirSource.java:288)] Closed file: /home/Documents/ecomMR/input.log, inode: 8524528, pos: 29

2019-01-10 12:42:42,692 (PollableSourceRunner-TaildirSource-a1) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile(ReliableTaildirEventReader.java:283)] Opening file: /home/Documents/ecomMR/input.log, inode: 8524533, pos: 24

2019-01-10 12:42:45,694 (PollableSourceRunner-TaildirSource-a1) [INFO - org.apache.flume.source.taildir.TaildirSource.closeTailFiles(TaildirSource.java:288)] Closed file: /home/Documents/ecomMR/input.log, inode: 8524533, pos: 33

Заранее признателен за решение.

...