Сила, чтобы остановить весеннюю загрузку, когда она закончилась - PullRequest
0 голосов
/ 30 мая 2018

на самом деле у меня есть приложение Spring Boot, и я использую встроенную kafka без использования spring-kafka, но с использованием слившейся библиотеки.

Я настроил собственную топологию, и когда я запускаю junit, потокостается слушать, а сервер весенней загрузки не заканчивается.

Я попытался использовать @DirtiesContext, но проблема остается.

Для запуска junit я использую

@RunWith(SpringRunner.class)
@SpringBootTest

Когда потребитель находится в цикле, я вижу это сообщение в консоли:

[Producer clientId = application1-3c4587c8-23f0-4c8b-8ef0-75bc1e0f966c-StreamThread-1-Manufacturer] Соединение с узлом -1не может быть установлено.Брокер может быть недоступен.

Советы?

Спасибо

1 Ответ

0 голосов
/ 13 июня 2018

Кажется, что ошибка происходит, потому что посредник не работает.Но KafkaStreams работает.

Чтобы исправить это, я думаю, что вам нужно запретить запуск KafkaStreams при тестировании.

Аннотация @SpringBootTest предоставляет классы атрибутов.Вы можете указать классы, которые будут зарегистрированы как bean-компоненты

. Вы можете запретить регистрацию связанных bean-компонентов KafkaStreams, указав атрибут классов.

Например, вы можете выполнить тестирование следующим образом.В этом случае описанная выше проблема не возникнет, потому что связанные компоненты KafkaStreams не зарегистрированы.

@RunWith(SpringRunner.class)
@SpringBootTest(classes = {ServiceImpl.class, CommonConfig.class})
public class SomeClassTest {
    @Autowired
    private ServiceImpl articleServiceImpl;

    // do test
}

Когда вы используете @SpringBootTest, лучше всего регистрировать только те компоненты, которые вы хотите протестировать.

Если вы не знакомы с этим подходом, попробуйте смоделировать bean-компонент, управляющий объектом KafkaStreams

Например, я создаю следующий bean-компонент для запуска и завершения объекта KafkaStreams.

@Component
public class ManageableStream implements DisposableBean, InitializingBean {

    private final KafkaStreams kafkaStreams;

    public ManageableStream() {
        StreamsConfig config = buildStreamConfig();
        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> inputStream = builder.stream("source.topic");
        inputStream.to("destination.topic");

        Topology topology = builder.build();
        kafkaStreams = new KafkaStreams(topology, config);
    }

    @Override
    public void destroy() throws Exception {
        kafkaStreams.close();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        kafkaStreams.start();
    }

    private StreamsConfig buildStreamConfig() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-stream-application");
        properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);

        return new StreamsConfig(properties);
    }
}

При тестировании макет bean-компонента, который управляет объектом KafkaStreams.

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

    @MockBean
    private ManageableStream manageableStream;

    @Test
    public void contextLoads() {
    }

}

Тогда KafkaStreams не запустится, поэтому описанная выше проблема не возникнет.

Если вы хотите проверить топологию KafkaStreamsПожалуйста, смотрите ссылку ниже

https://kafka.apache.org/11/documentation/streams/developer-guide/testing.html

Надеюсь, мой ответ был полезен

...