Я пытаюсь отправить сообщения Кафке с транзакцией. Итак, я использую этот код:
try (Producer<Void, String> producer = createProducer(kafkaContainerBootstrapServers)) {
producer.initTransactions();
producer.beginTransaction();
Arrays.stream(messages).forEach(
message -> producer.send(new ProducerRecord<>(KAFKA_INPUT_TOPIC, message)));
producer.commitTransaction();
}
...
private static Producer<Void, String> createProducer(String kafkaContainerBootstrapServers) {
return new KafkaProducer<>(
ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainerBootstrapServers,
ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString(),
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true,
ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString()
),
new VoidSerializer(),
new StringSerializer());
}
Если я использую локальную Кафку, она хорошо работает.
Но если я использую Kafka TestContainers, он зависает на producer.initTransactions()
:
private static final String KAFKA_VERSION = "4.1.1";
@Rule
public KafkaContainer kafka = new KafkaContainer(KAFKA_VERSION)
.withEmbeddedZookeeper();
Как настроить KafkaContainer для работы с транзакциями?