Я использую библиотеку Confluent Kafka Go для своего приложения Golang.Я использую AdminClient.GetMetadata
, чтобы перечислить все темы в кластере Kafka.Вот мой код для проверки
topic1 := fmt.Sprintf("test-get-all-topic-%d-%d", 1, randomInt())
err = adminClient.CreateTopic(topic1, numPartition, replicationFactor, map[string]string{})
topic2 := fmt.Sprintf("test-get-all-topic-%d-%d", 2, randomInt())
err = adminClient.CreateTopic(topic2, numPartition, replicationFactor, map[string]string{})
topics, err = adminClient.GetAllTopics()
Функция, которую я использую для получения списка всех тем:
admin.GetMetadata(nil, true, 2000)
Проблема с приведенным выше кодом заключается в следующем: когда я вызываю функциюGetAllTopics
, иногда topic2
еще не было доступно.Это делает мой модульный тест очень легко провалиться.Если я посплю перед GetAllData
и посплю около 1 секунды, все будет работать.
Но этот способ действительно сложен.Я использовал конфигурацию «acks = all» для клиента администратора, но он тоже не работает.Я хочу спросить, есть ли какая-нибудь более безопасная функция для получения всех тем после успешного нажатия 1 на Кафку.