Как мне преобразовать эту конфигурацию Spring-интеграции из XML в Java? - PullRequest
0 голосов
/ 27 марта 2019

Этот конкретный фрагмент имеет смысл реализовать в приложении, а не в XML, потому что он является константой во всем кластере, а не локализован для одного задания.

Из анализа XSD он выглядит какxml для int-kafka:outbound-channel-adapter создает KafkaProducerMessageHandler.

Нет видимого способа установить канал, тему или большинство других атрибутов.

Примечание для потенциальных downvoters - (rant on)Я был RTFM в течение недели, и я более запутался, чем когда я начал.Мой выбор языка окончил от прилагательных до наречий, и я начинаю заимствовать слова из других языков.Ответ может быть там.Но если это так, он не может быть обнаружен простыми смертными.(разглагольствования)

Конфигурация XML:

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-template="kafkaTemplate"
                                    auto-startup="false"
                                    channel="outbound-staging"
                                    topic="foo"
                                    sync="false"
                                    message-key-expression="'bar'"
                                    send-failure-channel="failures"
                                    send-success-channel="successes"
                                    partition-id-expression="2">
</int-kafka:outbound-channel-adapter>

Если это так, то я ожидаю, что конфигурация java будет выглядеть примерно так:

@Bean
public KafkaProducerMessageHandler kafkaOutboundChannelAdapter () {
    KafkaProducerMessageHandler result = new KafkaProducerMessageHandler(kafkaTemplate());

    result.set????? ();    // WTH?? No methods for most of the attributes?!!!

    return result;
}

РЕДАКТИРОВАТЬ: Дополнительная информация о решаемой проблеме высокого уровня

В рамках более крупного проекта, я пытаюсь реализовать пример учебника из https://docs.spring.io/spring-batch/4.0.x/reference/html/spring-batch-integration.html#remote-partitioning, с поддержкой Кафки вместо JMSbacking.

Я считаю, что окончательный процесс интеграции должен выглядеть примерно так:

partitionHandler -> messagingTemplate -> outbound-запросы (DirectChannel) -> outbound-staging (KafkaProducerMessageHandler) -> kafka

kafka -> executeContainer (KafkaMessageListenerContainer) -> inboundKafkaRequests (KafkaMessageDrivenChannelAdapter) -> входящие запросы (DirectChannel) -> serviceActivator (StepExecutionRequestHandler) Переадресация получателя ()-> kafka

kafka -> replyContainer (KafkaMessageListenerContainer) -> inboundKafkaReplies (KafkaMessageDrivenChannelAdapter) -> входящие ответы (DirectChannel) -> обработчик разбиения

1 Ответ

2 голосов
/ 27 марта 2019

Не уверен, что вы имеете в виду, что они пропущены, но это то, что я вижу в исходном коде этого KafkaProducerMessageHandler:

public void setTopicExpression(Expression topicExpression) {
    this.topicExpression = topicExpression;
}

public void setMessageKeyExpression(Expression messageKeyExpression) {
    this.messageKeyExpression = messageKeyExpression;
}

public void setPartitionIdExpression(Expression partitionIdExpression) {
    this.partitionIdExpression = partitionIdExpression;
}

/**
 * Specify a SpEL expression to evaluate a timestamp that will be added in the Kafka record.
 * The resulting value should be a {@link Long} type representing epoch time in milliseconds.
 * @param timestampExpression the {@link Expression} for timestamp to wait for result
 * fo send operation.
 * @since 2.3
 */
public void setTimestampExpression(Expression timestampExpression) {
    this.timestampExpression = timestampExpression;
}

и т. Д.

У вас также есть доступ к установщикам суперклассов, например setSync() для вашего варианта XML.

input-channel не является MessageHandler ответственностью. Он идет к Endpoint и может быть настроен через @ServiceActivator вместе с этим @Bean.

См. Дополнительную информацию в Справочном руководстве по интеграции Core Spring: https://docs.spring.io/spring-integration/reference/html/#annotations_on_beans

Также в начале есть очень важная глава: https://docs.spring.io/spring-integration/reference/html/#programming-tips

Кроме того, может быть лучше рассмотреть использование Java DSL вместо прямого MessageHandler использования:

             Kafka
                .outboundChannelAdapter(producerFactory)
                .sync(true)
                .messageKey(m -> m
                        .getHeaders()
                        .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
                .headerMapper(mapper())
                .partitionId(m -> 0)
                .topicExpression("headers[kafka_topic] ?: '" + topic + "'")
                .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic))
                .get();

Дополнительная информация о Java DSL приведена в упомянутых документах Spring Integration Doc: https://docs.spring.io/spring-integration/reference/html/#java-dsl

...