У меня есть два хоста в двух разных виртуальных машинах, у меня есть flume на хосте Centos 7 и kafka на хосте cloudera.Так что я соединил их обоих и сделал kafka в хосте cloudera как сток в коде, как показано ниже:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/Bureau/V1/outputCV
a1.sources.r1.fileHeader = true
a1.sources.r1.interceptors = timestampInterceptor
a1.sources.r1.interceptors.timestampInterceptor.type = timestamp
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = flume-topic
a1.sinks.k1.kafka.bootstrap.servers =
192.168.5.129:9090,192.168.5.129:9091,192.168.5.129:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Я получил эту ошибку, когда flume пытается отправить сообщения kafka:
[root@localhost V1]# /root/flume/bin/flume-ng agent --name a1 --conf-file /root/Bureau/V1/flumeconf/flumetest.conf
Warning: No configuration directory set! Use --conf <dir> to override.
Warning: JAVA_HOME is not set!
Info: Including Hadoop libraries found via (/root/hadoop/bin/hadoop) for HDFS access
Info: Including Hive libraries found via () for Hive access
+ exec /usr/bin/java -Xmx20m -cp '/root/flume/lib/*:/root/hadoop/etc/hadoop:/root/hadoop/share/hadoop/common/lib/*:/root/hadoop/share/hadoop/common/*:/root/hadoop/share/hadoop/hdfs:/root/hadoop/share/hadoop/hdfs/lib/*:/root/hadoop/share/hadoop/hdfs/*:/root/hadoop/share/hadoop/mapreduce/lib/*:/root/hadoop/share/hadoop/mapreduce/*:/root/hadoop/share/hadoop/yarn:/root/hadoop/share/hadoop/yarn/lib/*:/root/hadoop/share/hadoop/yarn/*:/lib/*' -Djava.library.path=:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib org.apache.flume.node.Application --name a1 --conf-file /root/Bureau/V1/flumeconf/flumetest.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/flume/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2019-05-07 19:04:41,363 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
2019-05-07 19:04:41,366 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/root/Bureau/V1/flumeconf/flumetest.conf
2019-05-07 19:04:41,374 INFO conf.FlumeConfiguration: Processing:k1
2019-05-07 19:04:41,375 INFO conf.FlumeConfiguration: Processing:c1
2019-05-07 19:04:41,375 INFO conf.FlumeConfiguration: Processing:r1
2019-05-07 19:04:41,375 INFO conf.FlumeConfiguration: Processing:k1
2019-05-07 19:04:41,375 INFO conf.FlumeConfiguration: Processing:r1
2019-05-07 19:04:41,375 INFO conf.FlumeConfiguration: Processing:k1
2019-05-07 19:04:41,375 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
2019-05-07 19:04:41,375 INFO conf.FlumeConfiguration: Processing:k1
2019-05-07 19:04:41,375 INFO conf.FlumeConfiguration: Processing:k1
2019-05-07 19:04:41,375 INFO conf.FlumeConfiguration: Processing:r1
2019-05-07 19:04:41,375 INFO conf.FlumeConfiguration: Processing:k1
2019-05-07 19:04:41,375 INFO conf.FlumeConfiguration: Processing:r1
2019-05-07 19:04:41,375 INFO conf.FlumeConfiguration: Processing:r1
2019-05-07 19:04:41,375 INFO conf.FlumeConfiguration: Processing:c1
2019-05-07 19:04:41,375 INFO conf.FlumeConfiguration: Processing:r1
2019-05-07 19:04:41,375 INFO conf.FlumeConfiguration: Processing:k1
2019-05-07 19:04:41,375 INFO conf.FlumeConfiguration: Processing:c1
2019-05-07 19:04:41,375 INFO conf.FlumeConfiguration: Processing:k1
2019-05-07 19:04:41,375 WARN conf.FlumeConfiguration: Agent configuration for 'a1' has no configfilters.
2019-05-07 19:04:41,392 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
2019-05-07 19:04:41,392 INFO node.AbstractConfigurationProvider: Creating channels
2019-05-07 19:04:41,397 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
2019-05-07 19:04:41,400 INFO node.AbstractConfigurationProvider: Created channel c1
2019-05-07 19:04:41,401 INFO source.DefaultSourceFactory: Creating instance of source r1, type spooldir
2019-05-07 19:04:41,415 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: org.apache.flume.sink.kafka.KafkaSink
2019-05-07 19:04:41,420 INFO kafka.KafkaSink: Using the static topic flume-topic. This may be overridden by event headers
2019-05-07 19:04:41,426 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
2019-05-07 19:04:41,432 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:Spool Directory source r1: { spoolDir: /root/Bureau/V1/outputCV } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@492cf580 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
2019-05-07 19:04:41,432 INFO node.Application: Starting Channel c1
2019-05-07 19:04:41,492 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2019-05-07 19:04:41,492 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
2019-05-07 19:04:41,492 INFO node.Application: Starting Sink k1
2019-05-07 19:04:41,494 INFO node.Application: Starting Source r1
2019-05-07 19:04:41,494 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /root/Bureau/V1/outputCV
2019-05-07 19:04:41,525 INFO producer.ProducerConfig: ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [192.168.5.129:9090, 192.168.5.129:9091, 192.168.5.129:9092]
buffer.memory = 33554432
client.id =
compression.type = snappy
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 1
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
2019-05-07 19:04:41,536 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-05-07 19:04:41,536 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-05-07 19:04:41,591 INFO utils.AppInfoParser: Kafka version : 2.0.1
2019-05-07 19:04:41,591 INFO utils.AppInfoParser: Kafka commitId : fa14705e51bd2ce5
2019-05-07 19:04:41,592 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
2019-05-07 19:04:41,592 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
2019-05-07 19:05:01,780 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2019-05-07 19:05:01,781 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /root/Bureau/V1/outputCV/cv.txt to /root/Bureau/V1/outputCV/cv.txt.COMPLETED
2019-05-07 19:05:03,673 WARN clients.NetworkClient: [Producer clientId=producer-1] Connection to node -2 could not be established. Broker may not be available.
2019-05-07 19:05:03,776 WARN clients.NetworkClient: [Producer clientId=producer-1] Connection to node -2 could not be established. Broker may not be available.
2019-05-07 19:05:03,856 WARN clients.NetworkClient: [Producer clientId=producer-1] Connection to node -2 could not be established. Broker may not be available.
2019-05-07 19:05:03,910 INFO clients.Metadata: Cluster ID: F_Byx5toQju8jaLb3zFwAA
2019-05-07 19:05:04,084 WARN clients.NetworkClient: [Producer clientId=producer-1] Error connecting to node quickstart.cloudera:9092 (id: 0 rack: null)
java.io.IOException: Can't resolve address: quickstart.cloudera:9092
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235)
at org.apache.kafka.common.network.Selector.connect(Selector.java:214)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:265)
at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:266)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:101)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:233)
... 7 more
2019-05-07 19:05:04,140 WARN clients.NetworkClient: [Producer clientId=producer-1] Error connecting to node quickstart.cloudera:9092 (id: 0 rack: null)
java.io.IOException: Can't resolve address: quickstart.cloudera:9092
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235)
at org.apache.kafka.common.network.Selector.connect(Selector.java:214)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:265)
at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:266)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:101)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:233)
... 7 more
2019-05-07 19:05:04,231 WARN clients.NetworkClient: [Producer clientId=producer-1] Error connecting to node quickstart.cloudera:9092 (id: 0 rack: null)
java.io.IOException: Can't resolve address: quickstart.cloudera:9092
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235)
at org.apache.kafka.common.network.Selector.connect(Selector.java:214)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:265)
at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:266)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:101)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:233)
... 7 more
2019-05-07 19:05:04,393 WARN clients.NetworkClient: [Producer clientId=producer-1] Error connecting to node quickstart.cloudera:9092 (id: 0 rack: null)
Ошибка подключения к узлу появляется только после отправки сообщений.
Может кто-нибудь помочь мне решить эту проблему.