Как я могу записывать данные в тему кафки и hdfs одновременно с flume - PullRequest
0 голосов
/ 11 сентября 2018

Я читаю текстовые файлы и хочу записать данные с flume в тему kafka и hdfs. Вы можете увидеть мою конфигурацию ниже. Я реализую источник в приложении Flume, и я читаю из текстового файла и создаю событие, чтобы передать его в hdfs, а также хочу манипулировать данными и передать событие в kafka, но я не смог записать их обоих одновременно.

Я имею в виду, что хочу создать другое событие, подобное этому, и это подтолкнет манипулируемые данные в другую тему kafka:

 Event event = EventBuilder.withBody(newline.toString().getBytes("UTF-8"));
 pushEvent(event);

Можете ли вы дать мне совет?

agent1.sources = source1 source2
agent1.sources.source1.type = com.intellimap.flume.source.FileSource
agent1.sources.source1.file.owner = myuser
agent1.sources.source1.input.dir = /data/inputdir
agent1.sources.source1.reader.count = 60
agent1.sources.source1.redis.ip=127.0.0.1
agent1.sources.source1.log.configuration=/appdata/myuser/FlumePlugin/log4j.properties
agent1.sources.source2.log.configuration=/appdata/myuser/FlumePlugin/log4j.properties
agent1.channels = channel2 channel3

agent1.channels.channel2.type = memory
agent1.channels.channel2.capacity = 10000000
agent1.channels.channel2.transactionCapactiy = 1000
agent1.channels.channel3.type = memory
agent1.channels.channel3.capacity = 1000000
agent1.channels.channel3.transactionCapactiy = 100

agent1.sources.source1.channels = channel2
agent1.sources.source2.channels = channel3

agent1.sinks = sink1 sink2
agent1.sources.source1.selector.type = multiplexing
agent1.sources.source1.selector.header =timestamp
agent1.sources.source1.selector.default = channel2
agent1.sources.source2.selector.type = multiplexing
agent1.sources.source2.selector.header =timestamp
agent1.sources.source2.selector.default = channel3

agent1.sinks.sink1.channel = channel2
agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sink1.kafka.topic = topic_x
agent1.sinks.sink1.kafka.bootstrap.servers = pappd03:9092,pappd04:9092,pappd05:9092
agent1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT
agent1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
agent1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka

agent1.sources.source2.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.source2.channels = channel3
agent1.sources.source2.batchSize = 5000
agent1.sources.source2.batchDurationMillis = 2000
agent1.sources.source2.kafka.bootstrap.servers = pappd03:9092, pappd04:9092, pappd05:9092
agent1.sources.source2.kafka.topics = topic_x
agent1.sources.source2.kafka.consumer.security.protocol = SASL_PLAINTEXT
agent1.sources.source2.kafka.consumer.sasl.kerberos.service.name = kafka
agent1.sources.source2.kafka.consumer.group.id = intellimap

agent1.sinks.sink2.type = hdfs
agent1.sinks.sink2.channel = channel3
agent1.sinks.sink2.hdfs.useLocalTimeStamp = true
agent1.sinks.sink2.hdfs.path = /user/hive/warehouse/my.db/x/%Y%m%d%H
agent1.sinks.sink2.hdfs.filePrefix = xdr
agent1.sinks.sink2.hdfs.fileType = DataStream
agent1.sinks.sink2.hdfs.writeFormat = Text
agent1.sinks.sink2.hdfs.rollCount = 10000000
agent1.sinks.sink2.hdfs.rollSize = 1000
agent1.sinks.sink2.hdfs.batchSize = 100000
agent1.sinks.sink2.hdfs.codeC = snappy
agent1.sinks.sink2.hdfs.fileType = CompressedStream
...