Как опросить конечную точку REST и написать в Kafka, используя Spring Cloud Stream 3.0? - PullRequest
0 голосов
/ 05 февраля 2020

Я пытаюсь использовать Spring Cloud Stream для записи в топику Кафки c. Поскольку привязка на основе аннотаций теперь называется "устаревшей" , я пытаюсь использовать функциональную модель, но не вижу, как и не могу найти примеры для моего варианта использования.

У меня есть конечная точка REST, которую я должен регулярно опрашивать для элементов типа Input. Он реактивный, поэтому это Supplier<Flux<Input>>.

@Component
@RequiredArgsConstructor
class ForgeRestNotificationSupplier implements NotificationSupplier {
    private final WebClient webClient;

    @Override public Flux<Notification> get() {
        return webClient
                .get()
                .uri("/endpoint")
                .retrieve()
                .bodyToMono(new ParameterizedTypeReference<List<Input>>() {})
                .flatMapMany(Flux::fromIterable);
    }
}

Теперь у меня есть картограф, который преобразует их в элементы типа Output (используя mapstruct).

@Mapper(componentModel = "spring")
public interface InputOutputMapper {
    @Mapping(target = "outputField", constant = "inputField")
    Output toOutput(Input input);
}

и сопровождающий Конфигурация, чтобы представить его как Function<>.

@Configuration class MappingConfiguration {
    @Bean Function<Input, Output> mapper(InputOutputMapper mapper) {
        return n -> mapper.toOutput(n);
    }
}

В моей конфигурации я добавил связыватель для функции отображения, чтобы связать вывод с моим Kafka topi c.

spring.cloud.stream:
  function:
    definition: inputConsumer
    bindings: inputConsumer-out-0:
       binder: kafka
       destination: outputTopic
  kafka.binder.brokers: ${KAFKA_BROKERS}

Итак, теперь я хочу регулярно запрашивать конечную точку и записывать сопоставленные данные в kafka, но я не понимаю, как это сделать. Мой наивный подход, состоящий только в планировании метода и подключении поставщика и картографа вручную, конечно же, приводит к опросу, но не вызывает настроенную привязку.

@Scheduled(fixedRate = 1000)
public void poll() {
    dataSupplier.get()
            .map(inputConsumer)
            .subscribe(m -> log.info("input received: {}", m),
                    err -> log.warn("Failed fetching input: {}, stack trace: {}", err.getMessage(), err.getStackTrace()),
                    () -> log.info("Finished processing input"));
}

Ссылка утверждает, что существует концепция за PolledConsumer, но я не знаю, почему я бы опросил потребителя, а не поставщика, и я не могу понять, как это относится к тому, что я пытаюсь сделать, или даже вписывается в общую концепцию (поскольку здесь мы явно отправляем на вывод MessageChannel вместо того, чтобы полагаться на настроенную целевую привязку).

Что я делаю не так? Я прихожу к выводу, что «наследие» - это более простой и понятный способ ведения дел.

...