SpringConot AutoConfigurato не работает в Kafka Logback Appender - PullRequest
0 голосов
/ 14 сентября 2018

У меня есть приложение для входа в систему kafka, которое содержит базовый файл logback.xml для отправки событий журнала в kafka.Я создал этот приложение для входа в систему kafka в качестве начального загрузочного приложения, чтобы я мог добавить этот jar-файл в любое клиентское приложение и перенести события журнала в раздел kafka, определенный в client.properties.

Компоненты: 1) kafka-logback-starter 2) Клиентское приложение

В kafka-logback-starter я читаю свойства приложения из клиентского приложения.

Вот моя конфигурация:

1) Добавленоspring.factories в kafka-logback-starter

2)

 @Configuration
 @EnableConfigurationProperties(MyKafkaProperties.class)
 public abstract class KafkaAppenderConfig<E> extends 
 UnsynchronizedAppenderBase<E> implements AppenderAttachable<E> {

 @Autowired
 private MyKafkaProperties myKafkaProperties;

 @Bean
 public KafkaConfig getKafkaConfig() {
    KafkaConfig kf = new KafkaConfig();
    kafka.put("Topic",sreKafkaProperties.getTopicName()); 
  **I see the values from client application.properties.**
    return kf;

 }

public void addProducerConfig(String keyValue) {
    System.out.println(getKafkaConfig().get("Topic")); //returns null
    System.out.println(myKafkaProperties.getTopic()); //returns null

}

Код суперкласса:

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
import ch.qos.logback.core.spi.AppenderAttachableImpl;
import 
com.github.danielwegener.logback.kafka.delivery.FailedDeliveryCallback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.ByteArraySerializer;

import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;


public class KafkaAppender<E> extends KafkaAppenderConfig<E> {

/**
 * Kafka clients uses this prefix for its slf4j logging.
 * This appender defers appends of any Kafka logs since it could cause harmful infinite recursion/self feeding effects.
 */
private static final String KAFKA_LOGGER_PREFIX = KafkaProducer.class.getPackage().getName().replaceFirst("\\.producer$", "");

private LazyProducer lazyProducer = null;
private final AppenderAttachableImpl<E> aai = new AppenderAttachableImpl<E>();
private final ConcurrentLinkedQueue<E> queue = new ConcurrentLinkedQueue<E>();
private final FailedDeliveryCallback<E> failedDeliveryCallback = new FailedDeliveryCallback<E>() {
    @Override
    public void onFailedDelivery(E evt, Throwable throwable) {
        aai.appendLoopOnAppenders(evt);
    }
};

public KafkaAppender() {
    // setting these as config values sidesteps an unnecessary warning (minor bug in KafkaProducer)
    addProducerConfigValue(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    addProducerConfigValue(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
}

@Override
public void doAppend(E e) {
    ensureDeferredAppends();
    if (e instanceof ILoggingEvent && ((ILoggingEvent)e).getLoggerName().startsWith(KAFKA_LOGGER_PREFIX)) {
        deferAppend(e);
    } else {
        super.doAppend(e);
    }
}

@Override
public void start() {
    // only error free appenders should be activated
    if (!checkPrerequisites()) return;

    if (partition != null && partition < 0) {
        partition = null;
    }

    lazyProducer = new LazyProducer();

    super.start();
}

@Override
public void stop() {
    super.stop();
    if (lazyProducer != null && lazyProducer.isInitialized()) {
        try {
            lazyProducer.get().close();
        } catch (KafkaException e) {
            this.addWarn("Failed to shut down kafka producer: " + e.getMessage(), e);
        }
        lazyProducer = null;
    }
}

@Override
public void addAppender(Appender<E> newAppender) {
    aai.addAppender(newAppender);
}

@Override
public Iterator<Appender<E>> iteratorForAppenders() {
    return aai.iteratorForAppenders();
}

@Override
public Appender<E> getAppender(String name) {
    return aai.getAppender(name);
}

@Override
public boolean isAttached(Appender<E> appender) {
    return aai.isAttached(appender);
}

@Override
public void detachAndStopAllAppenders() {
    aai.detachAndStopAllAppenders();
}

@Override
public boolean detachAppender(Appender<E> appender) {
    return aai.detachAppender(appender);
}

@Override
public boolean detachAppender(String name) {
    return aai.detachAppender(name);
}

@Override
protected void append(E e) {
    final byte[] payload = encoder.encode(e);
    final byte[] key = keyingStrategy.createKey(e);

    final Long timestamp = isAppendTimestamp() ? getTimestamp(e) : null;

    final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, partition, timestamp, key, payload);

    final Producer<byte[], byte[]> producer = lazyProducer.get();
    if (producer != null) {
        deliveryStrategy.send(lazyProducer.get(), record, e, failedDeliveryCallback);
    } else {
        failedDeliveryCallback.onFailedDelivery(e, null);
    }
}

protected Long getTimestamp(E e) {
    if (e instanceof ILoggingEvent) {
        return ((ILoggingEvent) e).getTimeStamp();
    } else {
        return System.currentTimeMillis();
    }
}

protected Producer<byte[], byte[]> createProducer() {
    return new KafkaProducer<>(new HashMap<>(producerConfig));
}

private void deferAppend(E event) {
    queue.add(event);
}

// drains queue events to super
private void ensureDeferredAppends() {
    E event;

    while ((event = queue.poll()) != null) {
        super.doAppend(event);
    }
}


private class LazyProducer {

    private volatile Producer<byte[], byte[]> producer;

    public Producer<byte[], byte[]> get() {
        Producer<byte[], byte[]> result = this.producer;
        if (result == null) {
            synchronized(this) {
                result = this.producer;
                if(result == null) {
                    this.producer = result = this.initialize();
                }
            }
        }

        return result;
    }

    protected Producer<byte[], byte[]> initialize() {
        Producer<byte[], byte[]> producer = null;
        try {
            producer = createProducer();
        } catch (Exception e) {
            addError("error creating producer", e);
        }
        return producer;
    }

    public boolean isInitialized() { return producer != null; }
}

}

Я вижу, что @Autowired isвызывается после выполнения метода addProducerConfig, поэтому значения свойств приложения недоступны в addProducerConfig.

Как убедиться, что @Autowired вызывается до того, как какие-либо методы выполняются в KafkaAppenderConfig?

ПРИМЕЧАНИЕ. KafkaAppenderConfigабстрактный класс.

1 Ответ

0 голосов
/ 16 сентября 2018

вы можете реализовать InitializingBean и вызвать конфигурацию вашего производителя в afterPropertiesSet

public class KafkaAppender<E> extends KafkaAppenderConfig<E> implements InitializingBean {

    @Override
    public void afterPropertiesSet() throws Exception {
        // setting these as config values sidesteps an unnecessary warning (minor bug in KafkaProducer)
        addProducerConfigValue(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        addProducerConfigValue(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...