Я пробую потоковые журналы из log4j2 в топик Кафки.
Мой сервер zookeeper & kafka работает. Я уже создал тему для этой же темы.
Unable to write to Kafka in appender [Kafka] java.util.concurrent.TimeoutException: Timeout after waiting for 30000 ms.
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:76)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at org.apache.logging.log4j.core.appender.mom.kafka.KafkaManager.send(KafkaManager.java:116)
at org.apache.logging.log4j.core.appender.mom.kafka.KafkaAppender.tryAppend(KafkaAppender.java:169)
at org.apache.logging.log4j.core.appender.mom.kafka.KafkaAppender.append(KafkaAppender.java:150)
at org.apache.logging.log4j.core.config.AppenderControl.tryCallAppender(AppenderControl.java:156)
at org.apache.logging.log4j.core.config.AppenderControl.callAppender0(AppenderControl.java:129)
at org.apache.logging.log4j.core.config.AppenderControl.callAppenderPreventRecursion(AppenderControl.java:120)
at org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:84)
at org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:448)
at org.apache.logging.log4j.core.config.LoggerConfig.processLogEvent(LoggerConfig.java:433)
at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:417)
at org.apache.logging.log4j.core.config.LoggerConfig.logParent(LoggerConfig.java:439)
at org.apache.logging.log4j.core.config.LoggerConfig.processLogEvent(LoggerConfig.java:434)
at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:417)
at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:403)
at org.apache.logging.log4j.core.config.AwaitCompletionReliabilityStrategy.log(AwaitCompletionReliabilityStrategy.java:63)
at org.apache.logging.log4j.core.Logger.logMessage(Logger.java:146)
at org.apache.logging.slf4j.Log4jLogger.log(Log4jLogger.java:376)
Мой зоопарк и кафка, как показано ниже
[2018-07-03 18:38:05,866] INFO Client attempting to establish new session at
/127.0.0.1:57207 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-07-03 18:38:05,867] INFO Creating new log file: log.cb
(org.apache.zookeeper.server.persistence.FileTxnLog)
[2018-07-03 18:38:05,899] INFO Established session 0x1646041bc4d0000 with
negotiated timeout 6000 for client /127.0.0.1:57207
(org.apache.zookeeper.server.ZooKeeperServer)
kafka:
[2018-07-03 18:38:05,807] INFO [ZooKeeperClient] Waiting until connected.
(kafka.zookeeper.ZooKeeperClient)
[2018-07-03 18:38:05,807] INFO Opening socket connection to server
127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL
(unknown error) (org.apache.zookeeper.ClientCnxn)
[2018-07-03 18:38:05,810] INFO Socket connection established to
127.0.0.1/127.0.0.1:2181, initiating session
(org.apache.zookeeper.ClientCnxn)
[2018-07-03 18:38:05,901] INFO Session establishment complete on server
127.0.0.1/127.0.0.1:2181, sessionid = 0x1646041bc4d0000, negotiated timeout =
6000 (org.apache.zookeeper.ClientCnxn)
[2018-07-03 18:38:05,905] INFO [ZooKeeperClient] Connected.
(kafka.zookeeper.ZooKeeperClient)
И потребительский код, как показано ниже
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.19.102.93:9092");
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>
(consumerConfig);
TestConsumerRebalanceListener rebalanceListener = new
TestConsumerRebalanceListener();
consumer.subscribe(Collections.singletonList("TestKafkaTopic"),
rebalanceListener);
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(1);
for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
Log4j2 config:
<Kafka name="Kafka" topic="TestKafkaTopic">
<PatternLayout pattern="|%-5p|%d{yyyy-MM-dd|HH:mm:ss,SSS}|%X{InterfaceId}|%X{SeqNo}|%X{Ouid} %X{srch1} %X{BussRef}|${sys:hostname}|${sys:ApplicationComponent}|%X{ExternalRefSend}|%m||%C{6}:%L|%t%n"/>
<Property name="metadata.broker.list">****:9092</Property>
<Property
name="serializer.class">kafka.serializer.StringEncoder</Property>
<Property name="bootstrap.servers">****:9092</Property>
</Kafka>
Любая идея, что я делаю не так. Поскольку моя тема работает с отдельным кодом, когда я тестировал отправку сообщений с битом продюсера с log4j2, он не смог отправить ошибки потребителю.