У меня есть сценарий, где я вижу другое поведение.Как и в общей сложности 3 различных сервиса
- Первый сервис будет прослушивать очередь Solace и выводить ее в kafka topic-1 (где транзакции включены)
- Второй сервис будет слушать выше тему kafka-1 и запишите его в другую тему kafka-2 (где у нас нет ручных коммитов, транзакции разрешены для создания в другой теме, автоматическое смещение коммитов как false, а уровень изоляции установлен на read_commited) назад Удалить
- ТретийСервис будет прослушивать из kafka topic-2 и записывать его обратно в очередь Solace (где у нас нет ручных коммитов, автоматическое смещение коммитов, так как false и изоляция. Уровень равен read_commited).
Теперь проблемапосле того, как я включил транзакцию и уровень изоляции во втором сервисе, я не могу читать сообщения, если я отключил транзакцию во втором сервисе, я могу прочитать все сообщения.
- Можем ли мы включить транзакции и уровень изоляции в одном сервисе
- Как это работает, если мой сервис просто производитель или потребитель (как EoS гарантирован для этих сервисов)
Отредактировано: Ниже показано, как выглядит мой yml
- kafka:
- binder:
- transaction:
- transaction-id-prefix:
- brokers:
- configuration:
all my consumer properties (ssl, sasl)
Обновлен (yml с весенним облаком):
spring:
cloud.stream:
bindings:
input:
destination: test_input
content-type: application/json
group: test_group
output:
destination: test_output
content-type: application/json
kafka.binder:
configuration:
isolation.level: read_committed
security.protocol: SASL_SSL
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
ssl.truststore.location: jks
ssl.truststore.password:
ssl.endpoint.identification.algorithm: null
brokers: broker1:9092,broker2:9092,broker3:9092
auto-create-topics: false
transaction:
transaction-id-prefix: trans-2
producer:
configuration:
retries: 2000
acks: all
security.protocol: SASL_SSL
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
ssl.truststore.location: jks
ssl.truststore.password:
ssl.endpoint.identification.algorithm: null
Обновлен (yml с весенним кафкой):
spring:
kafka:
bootstrap-servers: broker1:9092,broker2:9092,broker3:9092
consumer:
properties:
isolation.level: read_committed
ssl.truststore.location: truststore.jks
ssl.truststore.password:
security.protocol: SASL_SSL
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
producer:
transaction-id-prefix: trans-2
retries: 2000
acks: all
properties:
ssl.truststore.location: truststore.jks
ssl.truststore.password:
security.protocol: SASL_SSL
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
admin:
properties:
ssl.truststore.location: truststore.jks
ssl.truststore.password:
security.protocol: SASL_SSL
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
Обновлено с динамическим назначением
Caused by: java.lang.IllegalStateException: Cannot perform operation after producer has been closed
at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:810) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:819) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) ~[kafka-clients-2.0.0.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:423) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:351) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:209) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123) [spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169) [spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]
опробовал оба подхода к проблеме с определителем динамического назначения: Динамический распознаватель назначения