Spring Embedded Kafka с использованием Kafka 2.2.x - PullRequest
0 голосов
/ 07 октября 2019

Встроенный брокер Kafka для Spring не работает при обновлении Spring-Kafka 2.2.x, что требуется для kafka-client и kafka-stream 2.2.3. Кажется, он требует файл meta.properties в каталоге log.dir и требует установки broker.id. Однако это вызывает исключение для связывающего адреса. Кто-нибудь заставил это работать?

        at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:391)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:312)
    at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:120)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:117)
    ... 26 common frames omitted
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafkaBroker' defined in com.talentreef.notification.legacy.streams.topology.ApplicantPoolStreamTest$ApplicantPoolStreamTestConfig: Invocation of init method failed; nested exception is org.apache.kafka.common.KafkaException: Socket server failed to bind to localhost:50659: Address already in use.
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1778)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:593)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:515)
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
    at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:277)
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1251)
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1171)
    at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.inject(AutowiredAnnotationBeanPostProcessor.java:593)
    ... 44 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: Socket server failed to bind to localhost:50659: Address already in use.
    at kafka.network.Acceptor.openServerSocket(SocketServer.scala:573)
    at kafka.network.Acceptor.<init>(SocketServer.scala:451)
    at kafka.network.SocketServer.kafka$network$SocketServer$$createAcceptor(SocketServer.scala:245)
    at kafka.network.SocketServer$$anonfun$createDataPlaneAcceptorsAndProcessors$1.apply(SocketServer.scala:215)
    at kafka.network.SocketServer$$anonfun$createDataPlaneAcceptorsAndProcessors$1.apply(SocketServer.scala:214)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:214)
    at kafka.network.SocketServer.startup(SocketServer.scala:114)
    at kafka.server.KafkaServer.startup(KafkaServer.scala:253)
    at kafka.utils.TestUtils$.createServer(TestUtils.scala:140)
    at kafka.utils.TestUtils.createServer(TestUtils.scala)
    at org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:223)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1837)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1774)
    ... 54 common frames omitted
Caused by: java.net.BindException: Address already in use
    at java.base/sun.nio.ch.Net.bind0(Native Method)
    at java.base/sun.nio.ch.Net.bind(Net.java:461)
    at java.base/sun.nio.ch.Net.bind(Net.java:453)
    at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:227)
    at java.base/sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:80)
    at java.base/sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:73)
    at kafka.network.Acceptor.openServerSocket(SocketServer.scala:569)
    ... 68 common frames omitted

broker.properties

listeners=PLAINTEXT://localhost:9092
log.dir=/tmp/data/logs
auto.create.topics.enable=true
port=9092

Образец теста

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SomeSampleTestConfig.class)
@DirtiesContext
@EmbeddedKafka(
        partitions = 1,
        topics = {
                "someRandomTopic"},
        brokerPropertiesLocation = "classpath:/broker.properties"
)
public class SampleStreamTest {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Test
    public void shouldTestStream() {

    }
}

1 Ответ

0 голосов
/ 07 октября 2019

listeners=PLAINTEXT://localhost:9092

Caused by: java.net.BindException: Address already in use

Нельзя использовать тот же порт, если у вас уже есть брокер, прослушивающий 9092

Вы также должны переопределить все версии jar kafka, как обсуждено в документации .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...