Spring Cloud Stream - Невозможно получить настроенный адрес брокера Кафки - PullRequest
0 голосов
/ 10 декабря 2018

Я пытаюсь зарегистрировать слушатель kafka вручную с помощью Spring Cloud Stream, однако у меня возникают некоторые проблемы при попытке подключиться к брокеру:

[Consumer clientId=consumer-1, groupId=h2r] Initialize connection to node localhost:9092 (id: -1 rack: null) for sending metadata request
[Consumer clientId=consumer-1, groupId=h2r] Initiating connection to node localhost:9092 (id: -1 rack: null)
[Consumer clientId=consumer-1, groupId=h2r] Node -1 disconnected.
[Consumer clientId=consumer-1, groupId=h2r] Connection to node -1 could not be established. Broker may not be available.
[Consumer clientId=consumer-1, groupId=h2r] Give up sending metadata request since no node is available

Он пытается подключиться в localhost: 9092, но мойсервер находится на другом компьютере (192.168.1.200:9092), что я делаю не так в этой конфигурации:

@Service
public class TenantMessageConsumer {

private final String defaultEnterpriseSchema;
private final MailService mailService;
private final KafkaListenerContainerFactory containerFactory;
private final KafkaListenerEndpointRegistry registry;

public TenantMessageConsumer(String defaultEnterpriseSchema, MailService mailService, KafkaListenerContainerFactory containerFactory, KafkaListenerEndpointRegistry registry) {
    this.defaultEnterpriseSchema = defaultEnterpriseSchema;
    this.mailService = mailService;
    this.containerFactory = containerFactory;
    this.registry = registry;
    listen();
}


public void listen() {
    TenantMessageConsumer that=this;
    AbstractKafkaListenerEndpoint endpoint=new AbstractKafkaListenerEndpoint<String, Object>() {
        @Override
        protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container, MessageConverter messageConverter) {
            try {
                return new RecordMessagingMessageListenerAdapter(that,TenantMessageConsumer.class.getMethod("process",Object.class));
            } catch (NoSuchMethodException e) {
                return null;
            }
        }
    };
    endpoint.setId("tenant");
    endpoint.setTopics(defaultEnterpriseSchema);
    endpoint.setGroupId("h2r");
    registry.registerListenerContainer(endpoint,containerFactory);
}

public void process(Object message){
    if (message instanceof SimpleEmailMessage) {
        SimpleEmailMessage emailMessage = (SimpleEmailMessage) message;
        if (emailMessage.getContent().equals("reset-password"))
            mailService.sendPasswordResetMail(emailMessage);
    }
}
}

Предполагается получить эту конфигурацию:

spring:
    cloud:
        stream:
            kafka:
                binder:
                    brokers: 192.168.1.200

Итак,мне нужен способ получить настроенный адрес брокера и установить его в объекте конечной точки.

Важно

Поскольку название темы является динамическим, я не могу использоватьаннотации типа @ StreamListener.

1 Ответ

0 голосов
/ 10 декабря 2018

Вы не описали свою проблему и не предоставили какую-либо соответствующую информацию, такую ​​как трассировка стека, журналы и т. Д.(пожалуйста, сделайте это в будущем), но я попробую.

Вы абсолютно можете использовать @StreamListener и другие аннотации с поддержкой Spring Cloud Stream для динамических назначений .

Пожалуйста, просмотрите вышеупомянутый раздел и дайте нам знать, если вам все еще нужна помощь.

...