Крючки в Кафке Слушатель - PullRequest
0 голосов
/ 22 января 2020

Есть ли какие-либо виды хуков, доступные до / после того, как kafka прослушивает сообщение?

Вариант использования: MD C Должен быть установлен идентификатор взаимосвязи для выполнения трассировки журнала

Что я ищу? Метод обратного вызова до / после, так что идентификатор входа MD C может быть установлен при входе и, в конечном итоге, очищен MD C при выходе.

Отредактированный сценарий: Я получаю идентификатор взаимосвязи как часть заголовков Kafka, и я хочу установить его в MD C, как только получаю сообщение в слушателе Kafka

Признан за помощь

1 Ответ

1 голос
/ 22 января 2020

Вы можете добавить совет для слушателя ...

@SpringBootApplication
public class So59854374Application {

    public static void main(String[] args) {
        SpringApplication.run(So59854374Application.class, args);
    }

    @Bean
    public static BeanPostProcessor bpp() { // static is important
        return new BeanPostProcessor() {

            @Override
            public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
                if (bean instanceof MyListener) {
                    ProxyFactoryBean pfb = new ProxyFactoryBean();
                    pfb.setTarget(bean);
                    pfb.addAdvice(new MethodInterceptor() {

                        @Override
                        public Object invoke(MethodInvocation invocation) throws Throwable {
                            try {
                                System.out.println("Before");
                                return invocation.proceed();
                            }
                            finally {
                                System.out.println("After");
                            }
                        }

                    });
                    return pfb.getObject();
                }
                return bean;
            }

        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so59854374").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> template.send("so59854374", "foo");
    }

}

@Component
class MyListener {

    @KafkaListener(id = "so59854374", topics = "so59854374")
    public void listen(String in) {
        System.out.println(in);
    }

}

и

Before
foo
After

РЕДАКТИРОВАТЬ

Если вы добавив @Header("myMdcHeader") byte[] mdc в качестве дополнительного параметра к вашему методу слушателя kafka, вы можете использовать getArguments()[1] при вызове.

Другое решение - добавить RecordInterceptor к фабрике контейнера слушателя, которая позволяет вам получить доступ необработанный ConsumerRecord перед его передачей в адаптер прослушивателя.

/**
 * An interceptor for {@link ConsumerRecord} invoked by the listener
 * container before invoking the listener.
 *
 * @param <K> the key type.
 * @param <V> the value type.
 *
 * @author Gary Russell
 * @since 2.2.7
 *
 */
@FunctionalInterface
public interface RecordInterceptor<K, V> {

    /**
     * Perform some action on the record or return a different one.
     * If null is returned the record will be skipped.
     * @param record the record.
     * @return the record or null.
     */
    @Nullable
    ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record);

}
/**
 * Set an interceptor to be called before calling the listener.
 * Does not apply to batch listeners.
 * @param recordInterceptor the interceptor.
 * @since 2.2.7
 */
public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
    this.recordInterceptor = recordInterceptor;
}

Если вы используете пакетный прослушиватель, Kafka предоставляет ConsumerInterceptor.

...