Невозможно написать Кафке в appender [Kafka] java.util.concurrent.TimeoutException в Loh4j2 - PullRequest
0 голосов
/ 03 июля 2018

Я пробую потоковые журналы из log4j2 в топик Кафки. Мой сервер zookeeper & kafka работает. Я уже создал тему для этой же темы.

 Unable to write to Kafka in appender [Kafka] java.util.concurrent.TimeoutException: Timeout after waiting for 30000 ms.
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:76)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
    at org.apache.logging.log4j.core.appender.mom.kafka.KafkaManager.send(KafkaManager.java:116)
    at org.apache.logging.log4j.core.appender.mom.kafka.KafkaAppender.tryAppend(KafkaAppender.java:169)
    at org.apache.logging.log4j.core.appender.mom.kafka.KafkaAppender.append(KafkaAppender.java:150)
    at org.apache.logging.log4j.core.config.AppenderControl.tryCallAppender(AppenderControl.java:156)
    at org.apache.logging.log4j.core.config.AppenderControl.callAppender0(AppenderControl.java:129)
    at org.apache.logging.log4j.core.config.AppenderControl.callAppenderPreventRecursion(AppenderControl.java:120)
    at org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:84)
    at org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:448)
    at org.apache.logging.log4j.core.config.LoggerConfig.processLogEvent(LoggerConfig.java:433)
    at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:417)
    at org.apache.logging.log4j.core.config.LoggerConfig.logParent(LoggerConfig.java:439)
    at org.apache.logging.log4j.core.config.LoggerConfig.processLogEvent(LoggerConfig.java:434)
    at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:417)
    at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:403)
    at org.apache.logging.log4j.core.config.AwaitCompletionReliabilityStrategy.log(AwaitCompletionReliabilityStrategy.java:63)
    at org.apache.logging.log4j.core.Logger.logMessage(Logger.java:146)
    at org.apache.logging.slf4j.Log4jLogger.log(Log4jLogger.java:376)

Мой зоопарк и кафка, как показано ниже

[2018-07-03 18:38:05,866] INFO Client attempting to establish new session at 
/127.0.0.1:57207 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-07-03 18:38:05,867] INFO Creating new log file: log.cb 
(org.apache.zookeeper.server.persistence.FileTxnLog)
[2018-07-03 18:38:05,899] INFO Established session 0x1646041bc4d0000 with 
negotiated timeout 6000 for client /127.0.0.1:57207 
(org.apache.zookeeper.server.ZooKeeperServer)
kafka:
[2018-07-03 18:38:05,807] INFO [ZooKeeperClient] Waiting until connected. 
(kafka.zookeeper.ZooKeeperClient)
[2018-07-03 18:38:05,807] INFO Opening socket connection to server 
127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL 
(unknown error) (org.apache.zookeeper.ClientCnxn)
[2018-07-03 18:38:05,810] INFO Socket connection established to 
127.0.0.1/127.0.0.1:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-07-03 18:38:05,901] INFO Session establishment complete on server 
127.0.0.1/127.0.0.1:2181, sessionid = 0x1646041bc4d0000, negotiated timeout = 
6000 (org.apache.zookeeper.ClientCnxn)
[2018-07-03 18:38:05,905] INFO [ZooKeeperClient] Connected. 
(kafka.zookeeper.ZooKeeperClient)

И потребительский код, как показано ниже

Properties consumerConfig = new Properties();
  consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.19.102.93:9092");
  consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
  consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
 "org.apache.kafka.common.serialization.StringDeserializer");
  consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
 "org.apache.kafka.common.serialization.StringDeserializer");
  KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>
 (consumerConfig);
  TestConsumerRebalanceListener rebalanceListener = new 
  TestConsumerRebalanceListener();
  consumer.subscribe(Collections.singletonList("TestKafkaTopic"), 
  rebalanceListener);

  while (true) {
      ConsumerRecords<byte[], byte[]> records = consumer.poll(1);
      for (ConsumerRecord<byte[], byte[]> record : records) {
          System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
      }

      consumer.commitSync();
  }

Log4j2 config:

 <Kafka name="Kafka" topic="TestKafkaTopic">
                  <PatternLayout pattern="|%-5p|%d{yyyy-MM-dd|HH:mm:ss,SSS}|%X{InterfaceId}|%X{SeqNo}|%X{Ouid} %X{srch1} %X{BussRef}|${sys:hostname}|${sys:ApplicationComponent}|%X{ExternalRefSend}|%m||%C{6}:%L|%t%n"/>
<Property name="metadata.broker.list">****:9092</Property>
        <Property 
 name="serializer.class">kafka.serializer.StringEncoder</Property>
        <Property name="bootstrap.servers">****:9092</Property>

    </Kafka>

Любая идея, что я делаю не так. Поскольку моя тема работает с отдельным кодом, когда я тестировал отправку сообщений с битом продюсера с log4j2, он не смог отправить ошибки потребителю.

1 Ответ

0 голосов
/ 02 февраля 2019

Я тоже сталкивался с такой же проблемой с приложением log4j2 kafka. Сбой из-за того, что уровень корневого журнала был установлен для трассировки. Я пытался поднять уровень регистратора для org.apache.kafka до уровня, равного или выше, чем INFO, но все остальное регистрируется на желаемом уровне журнала (например, трассировка, отладка). После этого он начал работать.

Ссылка: https://github.com/danielwegener/logback-kafka-appender/issues/44

...