Весенний облачный поток кафки транзакций на стороне производителя - PullRequest
0 голосов
/ 29 сентября 2019

У нас есть приложение Spring Cloud Stream, использующее Kafka.Требование заключается в том, что на стороне производителя список сообщений должен быть помещен в раздел транзакции.Там нет потребителей для сообщений в том же приложении.Когда я инициировал транзакцию с использованием префикса spring.cloud.stream.kafka.binder.transaction.transaction-id, я столкнулся с ошибкой, что у диспетчера нет подписчика, а общее количество разделов, полученных по этой теме, меньше, чемтранзакция настроена.Приложение не может получить разделы для темы в режиме транзакции.Не могли бы вы сказать, если я что-то упустил.Завтра выложу подробные логи.

Спасибо

1 Ответ

0 голосов
/ 29 сентября 2019

Вам необходимо показать свой код и конфигурацию, а также версии, которые вы используете.

Транзакции только для производителя обсуждаются в документации .

Включите транзакции, установив в spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix непустое значение, например, tx-.При использовании в приложении процессора потребитель запускает транзакцию;любые записи, отправленные в потоке потребителя, участвуют в той же транзакции.Когда прослушиватель завершается нормально, контейнер слушателя отправляет смещение транзакции и фиксирует его.Общая фабрика производителей используется для настройки всех привязок производителей с использованием spring.cloud.stream.kafka.binder.transaction.producer. * Properties;индивидуальная привязка Свойства производителя Kafka игнорируются.

Если вы хотите использовать транзакции в исходном приложении или из некоторого произвольного потока для транзакции только для производителя (например, метод @Scheduled), вы должны получить ссылку на транзакциюфабрика производителя и определить с ее помощью bean-компонент KafkaTransactionManager.

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders) {
    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
    return new KafkaTransactionManager<>(pf);
}

Обратите внимание, что мы получаем ссылку на подшивку, используя BinderFactory;используйте null в первом аргументе, когда настроен только один связыватель.Если настроено более одного подшивки, используйте имя подшивки для получения ссылки.Получив ссылку на связыватель, мы можем получить ссылку на ProducerFactory и создать менеджер транзакций.

Тогда вы просто поддержите обычную поддержку транзакций Spring, например TransactionTemplate или @Transactional, например:

public static class Sender {

    @Transactional
    public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
        stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
    }

}

Если вы хотите синхронизировать транзакции только для производителя с транзакциями из другого менеджера транзакций, используйте ChainedTransactionManager.

...