Spring Cloud Stream для Kafka с API потребителя / производителя ровно после того, как семантика с префиксом-идентификатора транзакции не работает должным образом - PullRequest
0 голосов
/ 29 марта 2019

У меня есть сценарий, где я вижу другое поведение.Как и в общей сложности 3 различных сервиса

  • Первый сервис будет прослушивать очередь Solace и выводить ее в kafka topic-1 (где транзакции включены)
  • Второй сервис будет слушать выше тему kafka-1 и запишите его в другую тему kafka-2 (где у нас нет ручных коммитов, транзакции разрешены для создания в другой теме, автоматическое смещение коммитов как false, а уровень изоляции установлен на read_commited) назад Удалить
  • ТретийСервис будет прослушивать из kafka topic-2 и записывать его обратно в очередь Solace (где у нас нет ручных коммитов, автоматическое смещение коммитов, так как false и изоляция. Уровень равен read_commited).

Теперь проблемапосле того, как я включил транзакцию и уровень изоляции во втором сервисе, я не могу читать сообщения, если я отключил транзакцию во втором сервисе, я могу прочитать все сообщения.

  • Можем ли мы включить транзакции и уровень изоляции в одном сервисе
  • Как это работает, если мой сервис просто производитель или потребитель (как EoS гарантирован для этих сервисов)

Отредактировано: Ниже показано, как выглядит мой yml

 - kafka:
   - binder:
     - transaction:
         - transaction-id-prefix:
       - brokers: 
         - configuration: 
               all my consumer properties (ssl, sasl)

Обновлен (yml с весенним облаком):

spring: 
  cloud.stream:
      bindings:
        input:
          destination: test_input
          content-type: application/json
          group: test_group
        output:
          destination: test_output
          content-type: application/json
      kafka.binder: 
          configuration: 
            isolation.level: read_committed
            security.protocol: SASL_SSL
            sasl.mechanism: GSSAPI
            sasl.kerberos.service.name: kafka
            ssl.truststore.location: jks
            ssl.truststore.password: 
            ssl.endpoint.identification.algorithm: null            
          brokers: broker1:9092,broker2:9092,broker3:9092
          auto-create-topics: false
          transaction:
            transaction-id-prefix: trans-2
            producer:
              configuration:
                retries: 2000
                acks: all
                security.protocol: SASL_SSL
                sasl.mechanism: GSSAPI
                sasl.kerberos.service.name: kafka
                ssl.truststore.location: jks
                ssl.truststore.password: 
                ssl.endpoint.identification.algorithm: null

Обновлен (yml с весенним кафкой):

spring:
  kafka:
    bootstrap-servers: broker1:9092,broker2:9092,broker3:9092
    consumer:
      properties:
        isolation.level: read_committed
        ssl.truststore.location: truststore.jks
        ssl.truststore.password: 
        security.protocol: SASL_SSL
        sasl.mechanism: GSSAPI
        sasl.kerberos.service.name: kafka
    producer:
      transaction-id-prefix: trans-2
      retries: 2000
      acks: all
      properties:
        ssl.truststore.location: truststore.jks
        ssl.truststore.password: 
        security.protocol: SASL_SSL
        sasl.mechanism: GSSAPI
        sasl.kerberos.service.name: kafka
    admin:
      properties:
        ssl.truststore.location: truststore.jks
        ssl.truststore.password: 
        security.protocol: SASL_SSL
        sasl.mechanism: GSSAPI
        sasl.kerberos.service.name: kafka

Обновлено с динамическим назначением

Caused by: java.lang.IllegalStateException: Cannot perform operation after producer has been closed
    at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:810) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:819) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) ~[kafka-clients-2.0.0.jar:na]
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:423) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:351) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:209) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123) [spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169) [spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]

опробовал оба подхода к проблеме с определителем динамического назначения: Динамический распознаватель назначения

1 Ответ

0 голосов
/ 29 марта 2019

у меня нормально работает;все они в одном приложении, но это не имеет значения ...

@SpringBootApplication
@EnableBinding(Channels.class)
public class So55419549Application {

    public static void main(String[] args) {
        SpringApplication.run(So55419549Application.class, args);
    }

    @Bean
    public IntegrationFlow service1(MessageChannel out1) {
        return IntegrationFlows.from(() -> "foo", e -> e
                    .poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
                .log(Level.INFO, m -> "s1 " + m.getPayload())
                .channel(out1)
                .get();
    }

    @StreamListener("in2")
    @SendTo("out2")
    public String service2(String in) {
        System.out.println("s2 " + in);
        return in.toUpperCase();
    }

    @StreamListener("in3")
    public void service3(String in) {
        System.out.println("s3 " + in);
    }

}

interface Channels {

    @Output
    MessageChannel out1();

    @Input
    MessageChannel in2();

    @Output
    MessageChannel out2();

    @Input
    MessageChannel in3();

}

и

spring:
  cloud:
    stream:
      bindings:
        out1:
          destination: topic1
        in2:
          group: s2
          destination: topic1
        out2:
          destination: topic2
        in3:
          group: s3
          destination: topic2
      kafka:
        binder:
          transaction:
            transaction-id-prefix: tx
        bindings:
          in2:
            consumer:
              configuration:
                isolation:
                  level: read_committed
          in3:
            consumer:
              configuration:
                isolation:
                  level: read_committed
  kafka:
    producer:
      # needed again here so boot declares a TM for us
      transaction-id-prefix: tx
      retries: 10
      acks: all
logging:
  level:
    org.springframework.kafka.transaction: debug

и

2019-03-29 12:57:08.345  INFO 75700 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler   
    : s1 foo
2019-03-29 12:57:08.353 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager          : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2019-03-29 12:57:08.353 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager          : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@6790c874, txId=txs2.topic1.0]]
s2 foo
2019-03-29 12:57:08.357 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager          : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2019-03-29 12:57:08.358 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager          : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@820ef3d, txId=txs3.topic2.0]]
s3 FOO

EDIT

Привязка не включает синхронизацию транзакций в менеджере транзакций.В качестве обходного пути, добавьте

TransactionSynchronizationManager.setActualTransactionActive(true);

к своему @StreamListener.

Я открыл ошибка против связывателя.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...