Как передать объект, связанный с сообщением, через адаптер исходящего канала - PullRequest
0 голосов
/ 15 декабря 2018

У меня есть следующее:

[inbound channel adapter] -> ... -> foo -> [outbound channel adapter] -> bar

Как я могу написать свое приложение для интеграции с пружиной, чтобы foo мог добавить дополнительный объект, который не является частью сообщения, которое [outbound channel adapter] будет использовать, напримерчто bar получает?

Мое приложение в основном получает сообщения от AWS SQS (используя spring-integration-aws), выполняет некоторые фильтрации / преобразования, а затем публикует сообщение в Apache Kafka (используя spring-integration-kafka), и еслии только в случае успеха удаляет исходное сообщение из очереди SQS.

По этой причине, когда я получаю сообщение SQS, я хочу удержать объект дескриптора / подтверждения получения, преобразовать остальную часть сообщенияв сообщение Кафки, которое будет опубликовано, а затем, если это удастся, использовать этот дескриптор квитанции / объект подтверждения для удаления исходного сообщения.

Итак, я использую этот пример кода из spring-integration-kafka документов:

@Bean
@ServiceActivator(inputChannel = "toKafka", outputChannel = "result")
public MessageHandler handler() throws Exception {
    KafkaProducerMessageHandler<String, String> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate());
    handler.setTopicExpression(new LiteralExpression("someTopic"));
    handler.setMessageKeyExpression(new LiteralExpression("someKey"));
    handler.setFailureChannel(failures());
    return handler;
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
    // set more properties
    return new DefaultKafkaProducerFactory<>(props);
}

С учетом вышеизложенного, если у меня есть сообщение message и некоторая дополнительная, не связанная информация extra, что я отправляю на канал toKafka, чтобы handlerбудет потреблять message, и если это было успешно, канал result получит extra?

1 Ответ

0 голосов
/ 15 декабря 2018

Адаптеры исходящих каналов не выдают выходных данных - они только односторонние и завершают поток.

Вы можете сделать toKafka a PublishSubscribeChannel и добавить второй активатор службы;по умолчанию второе вызывается только в том случае, если первое успешно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...