Я пытаюсь использовать Spring Cloud Stream для записи в топику Кафки c. Поскольку привязка на основе аннотаций теперь называется "устаревшей" , я пытаюсь использовать функциональную модель, но не вижу, как и не могу найти примеры для моего варианта использования.
У меня есть конечная точка REST, которую я должен регулярно опрашивать для элементов типа Input
. Он реактивный, поэтому это Supplier<Flux<Input>>
.
@Component
@RequiredArgsConstructor
class ForgeRestNotificationSupplier implements NotificationSupplier {
private final WebClient webClient;
@Override public Flux<Notification> get() {
return webClient
.get()
.uri("/endpoint")
.retrieve()
.bodyToMono(new ParameterizedTypeReference<List<Input>>() {})
.flatMapMany(Flux::fromIterable);
}
}
Теперь у меня есть картограф, который преобразует их в элементы типа Output
(используя mapstruct).
@Mapper(componentModel = "spring")
public interface InputOutputMapper {
@Mapping(target = "outputField", constant = "inputField")
Output toOutput(Input input);
}
и сопровождающий Конфигурация, чтобы представить его как Function<>
.
@Configuration class MappingConfiguration {
@Bean Function<Input, Output> mapper(InputOutputMapper mapper) {
return n -> mapper.toOutput(n);
}
}
В моей конфигурации я добавил связыватель для функции отображения, чтобы связать вывод с моим Kafka topi c.
spring.cloud.stream:
function:
definition: inputConsumer
bindings: inputConsumer-out-0:
binder: kafka
destination: outputTopic
kafka.binder.brokers: ${KAFKA_BROKERS}
Итак, теперь я хочу регулярно запрашивать конечную точку и записывать сопоставленные данные в kafka, но я не понимаю, как это сделать. Мой наивный подход, состоящий только в планировании метода и подключении поставщика и картографа вручную, конечно же, приводит к опросу, но не вызывает настроенную привязку.
@Scheduled(fixedRate = 1000)
public void poll() {
dataSupplier.get()
.map(inputConsumer)
.subscribe(m -> log.info("input received: {}", m),
err -> log.warn("Failed fetching input: {}, stack trace: {}", err.getMessage(), err.getStackTrace()),
() -> log.info("Finished processing input"));
}
Ссылка утверждает, что существует концепция за PolledConsumer
, но я не знаю, почему я бы опросил потребителя, а не поставщика, и я не могу понять, как это относится к тому, что я пытаюсь сделать, или даже вписывается в общую концепцию (поскольку здесь мы явно отправляем на вывод MessageChannel
вместо того, чтобы полагаться на настроенную целевую привязку).
Что я делаю не так? Я прихожу к выводу, что «наследие» - это более простой и понятный способ ведения дел.