Кафка Топи c Модульное тестирование - PullRequest
0 голосов
/ 28 февраля 2020

Я создаю топи c в Кафке с помощью метода ниже,

public class  KafkaTopicAdmin {

  public void createTopic(final String topicName) {

        final AdminClient client = getKafkaClient();
        final List<NewTopic> topics = Collections.synchronizedList(new ArrayList<>());
        final NewTopic newTopic = new NewTopic(topicName, connectionConfig.getNoOfPartition(), (short) connectionConfig.getNoOfReplicas());
        topics.add(newTopic);
        client.createTopics(topics);

    }

private AdminClient getKafkaClient() {
        final Map<String, Object> configs = new ConcurrentHashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "bootstrap-ip");
        return AdminClient.create(configs);
    }
}

, и мой тестовый класс,

@EmbeddedKafka
public class KafkaTopicAdminTest {

    private KafkaTopicAdmin kafkaTopicAdmin;
    private AdminClient kafkaAdminClient;

    @Before
    public void setUp(){

        kafkaTopicAdmin = new KafkaTopicAdmin();
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,  StringUtils.arrayToCommaDelimitedString(new EmbeddedKafkaBroker(2).getBrokerAddresses()));
        kafkaAdminClient = KafkaAdminClient.create(properties);
    }

    @Test
    public void shouldCreateTopic() throws ExecutionException, InterruptedException {
        kafkaTopicAdmin.createTopic("TestTopic");
        ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
        listTopicsOptions.listInternal(true);
        System.out.println("topics:" + kafkaAdminClient.listTopics(listTopicsOptions).names().get());
    }
}

Я получаю следующую ошибку,

[AdminClient clientId=adminclient-1] Error connecting to node 127.0.0.1:0 (id: -2 rack: null)
java.net.BindException: Can't assign requested address

Я использую @EmbeddedKafka Я просто хотел убедиться, что topi c присутствует в списке. Это правильный подход или любое другое предложение, пожалуйста?

1 Ответ

1 голос
/ 29 февраля 2020

Судя по ошибке, вы указали свойство bootstrap .servers без порта: 127.0.0.1:0

Это свойство также принимает порт вашего сервера Kafka bootstrap, который по умолчанию 9092, поэтому попробуйте это:

configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "bootstrap-server-ip:9092");
...