Неправильные имена Kafka topi c для приложения Spring-Cloud-Function, развернутого как часть потока Spring-Cloud-Data-Flow - PullRequest
0 голосов
/ 28 апреля 2020

У меня есть простой поток SCDF, который выглядит следующим образом:

http --port=12346 | mvmn-transform | file --name=tmp.txt --directory=/tmp

mvmn-transform - это простой пользовательский преобразователь, который выглядит следующим образом:

@SpringBootApplication
@EnableBinding(Processor.class)
@EnableConfigurationProperties(ScdfTestTransformerProperties.class)
@Configuration
public class ScdfTestTransformer {
    public static void main(String args[]) {
        SpringApplication.run(ScdfTestTransformer.class, args);
    }

    @Autowired
    protected ScdfTestTransformerProperties config;

    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public Object transform(Message<?> message) {
        Object payload = message.getPayload();
        Map<String, Object> result = new HashMap<>();
        Map<String, String> headersStr = new HashMap<>();

        message.getHeaders().forEach((k, v) -> headersStr.put(k, v != null ? v.toString() : null));

        result.put("headers", headersStr);
        result.put("payload", payload);
        result.put("configProp", config.getSomeConfigProp());

        return result;
    }

    // See https://stackoverflow.com/questions/59155689/could-not-decode-json-type-for-key-file-name-in-a-spring-cloud-data-flow-stream
    @Bean("kafkaBinderHeaderMapper")
    public KafkaHeaderMapper kafkaBinderHeaderMapper() {
        BinderHeaderMapper mapper = new BinderHeaderMapper();
        mapper.setEncodeStrings(true);
        return mapper;
    }
}

Это прекрасно работает.

Но я читал, что Spring Cloud Function должна позволять мне реализовывать такие приложения без необходимости указывать привязку и аннотации преобразователя, поэтому я изменил это на следующее:

@SpringBootApplication
// @EnableBinding(Processor.class)
@EnableConfigurationProperties(ScdfTestTransformerProperties.class)
@Configuration
public class ScdfTestTransformer {
    public static void main(String args[]) {
        SpringApplication.run(ScdfTestTransformer.class, args);
    }

    @Autowired
    protected ScdfTestTransformerProperties config;

    // @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    @Bean
    public Function<Message<?>, Map<String, Object>> transform(
    // Message<?> message
    ) {
        return message -> {
            Object payload = message.getPayload();
            Map<String, Object> result = new HashMap<>();
            Map<String, String> headersStr = new HashMap<>();

            message.getHeaders().forEach((k, v) -> headersStr.put(k, v != null ? v.toString() : null));

            result.put("headers", headersStr);
            result.put("payload", payload);
            result.put("configProp", "Config prop val: " + config.getSomeConfigProp());

            return result;
        };
    }

    // See https://stackoverflow.com/questions/59155689/could-not-decode-json-type-for-key-file-name-in-a-spring-cloud-data-flow-stream
    @Bean("kafkaBinderHeaderMapper")
    public KafkaHeaderMapper kafkaBinderHeaderMapper() {
        BinderHeaderMapper mapper = new BinderHeaderMapper();
        mapper.setEncodeStrings(true);
        return mapper;
    }
}

А теперь у меня проблема - имена источника и цели SCDF c явно игнорируются Spring-Cloud-Function, и вместо этого создаются темы transform-in-0 и transform-out-0.

SCDF создает темы с именами, такими как <stream-name>.<app-name>, например, что-то вроде TestStream123.http и TestStream123.mvmn-transform

Ранее они использовались для преобразования - как и должно быть, поскольку оно является частью потока SCDF. Но теперь они игнорируются Spring-Cloud-Function, и вместо них создаются transform-in-0 и transform-out-0.

Таким образом, мой преобразователь больше не получает никаких входных данных, так как ожидает его от неправильной Kafka topi c , И, вероятно, также не будет выводить данные в поток, поскольку он также выводит неверную информацию Kafka topi c.

PS На всякий случай полный код проекта на GitHub: https://github.com/mvmn/scdftest-transformer/tree/scfunc

Чтобы запустить локально, запустите консоль Kafka, Skipper, SCDF и SCDF, выполните mvn clean install в папке приложения, а затем app register --name mvmn-transform-1 --type processor --uri maven://x.mvmn.study.scdf.scdftest:scdftest-transformer:0.1.1-SNAPSHOT --metadata-uri maven://x.mvmn.study.scdf.scdftest:scdftest-transformer:0.1.1-SNAPSHOT в консоли. Затем вы можете развернуть поток, используя определение http --port=12346 | mvmn-transform | file --name=tmp.txt --directory=/tmp

1 Ответ

2 голосов
/ 29 апреля 2020

Поскольку вы используете функциональную модель написания приложений Spring Cloud Stream, при развертывании этого приложения вам нужно передать два свойства в пользовательский процессор для восстановления поведения Spring Cloud Data Flow.

spring.cloud.stream.function.bindings.transform-in-0=input spring.cloud.stream.function.bindings.transform-out-0=output

Можете ли вы попробовать и посмотреть, если это имеет значение?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...