Весенняя кафка. Не запускается EmbeddedKafkaBroker - PullRequest
1 голос
/ 25 марта 2020

Я кодирую Kafka Broker и Consumer для перехвата сообщений из приложения. При попытке получить сообщения от Потребителя возникает ошибка

java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:216)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:531)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:540)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
    at org.springframework.kafka.test.utils.KafkaTestUtils.getRecords(KafkaTestUtils.java:303)
    at org.springframework.kafka.test.utils.KafkaTestUtils.getRecords(KafkaTestUtils.java:280)

На стороне приложения (Производитель) также возникает ошибка соединения

2020-03-25 12:29:33.689  WARN 25786 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1, transactionalId=tx0] Connection to node -1 (<here broker hostname>:9092) could not be established. Broker may not be available.

Мой проект имеет следующие зависимости:

compile "org.springframework.kafka:spring-kafka-test:2.4.4.RELEASE"
compile "org.springframework.kafka:spring-kafka:2.4.4.RELEASE"

Код моего брокера Kafka

public class KafkaServer {

    private static final String BROKERPORT = "9092";
    private static final String BROKERHOST = "localhost";
    public static final String TOPIC1 = "fss-fsstransdata";
    public static final String TOPIC2 = "fss-fsstransscores";
    public static final String TOPIC3 = "fss-fsstranstimings";
    public static final String TOPIC4 = "fss-fssdevicedata";
    @Getter
    private Consumer<String, String> consumer;

    private EmbeddedKafkaBroker embeddedKafkaBroker;

    public void run() {

        String[] topics = {TOPIC1, TOPIC2, TOPIC3, TOPIC4};

        this.embeddedKafkaBroker = new EmbeddedKafkaBroker(
                1,
                false,
                1,
                topics
        ).kafkaPorts(BROKERPORT);

        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", this.embeddedKafkaBroker));
        this.consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();

        this.consumer.subscribe(Arrays.asList(topics));
    } 
}

Пожалуйста, помогите разобраться с ситуацией. Я не очень хорош в архитектуре кафки и в том, как она может быть реализована в Spring.

Ответы [ 2 ]

1 голос
/ 25 марта 2020

Если вы используете spring, вам нужно аннотировать ваш компонент с помощью @ EmbeddedKafka , а затем использовать @ Autowire на EmbeddedKafkaBroker

Пример конфигурации аннотации встроенной кафки:

@EmbeddedKafka(
    partitions = 1, 
    controlledShutdown = false,
    brokerProperties = {// place your proerties here
})

Что я хотел бы сделать, это создать пружинный компонент KafkaServerConfig и поместить все мои логи c для конфигурации и построения компонента внутри.

PS: он должен Следует отметить, что EmbeddedKafkaBroker предназначен для юнит-тестов.

0 голосов
/ 25 марта 2020

EmbeddedKafkaBroker предназначен для использования из контекста приложения Spring или JUnit4 @Rule или @ClassRule или JUnit5 Condition.

Чтобы использовать его вне этих сред, вы должны позвонить afterPropertiesSet(), чтобы инициализировать его, и destroy(), чтобы выключить его.

...