Я настроил простой поток Spring Integration, который состоит из следующих шагов:
- периодически опрашивает остальные API, затем
- выполняет некоторую обработку полезной нагрузки
- и приземлите его на тему Кафки.
Пожалуйста, соблюдайте приведенный ниже код:
@Component
public class MyIntegrationFlow extends IntegrationFlowAdapter {
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(() -> List.of("pathVariable1", "pathVariable2"), c -> c.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
.split()
.handle(httpRequest(), c -> c.advice(new RequestHandlerRetryAdvice()))
.transform(Tranformers.fromJson(Foo.class))
.filter(payload -> payload.isValid())
.log()
.transform(Tranformers.toJson())
.channel(Source.OUTPUT); // output channel for kafka topic
}
private HttpMessageHandlerSpec httpRequest() {
return Http.outboundGateway("http://somehost:8080/{pathVariable}")
.httpMethod(GET)
.uriVariable("pathVariable", Message::getPayload)
.expectedResponseType(String.class);
}
}
Это прекрасно работает, однако я изо всех сил пытаюсь придумать несколько хороших тестов.,
- Как я должен издеваться над внешним REST API?
- Как мне проверить, что политика повторных попыток срабатывает и выполняется желаемое количество http-запросов?
- Как изменить
MessageSource
потока (списка переменных пути), который периодически опрашивается? - Как проверить, успешно ли полезная нагрузка попала в тему Kafka?