Производитель # initTransactions не работает с KafkaContainer - PullRequest
0 голосов
/ 20 марта 2019

Я пытаюсь отправить сообщения Кафке с транзакцией. Итак, я использую этот код:

 try (Producer<Void, String> producer = createProducer(kafkaContainerBootstrapServers)) {
            producer.initTransactions();
            producer.beginTransaction();
            Arrays.stream(messages).forEach(
                message -> producer.send(new ProducerRecord<>(KAFKA_INPUT_TOPIC, message)));
            producer.commitTransaction();
        }

...

private static Producer<Void, String> createProducer(String kafkaContainerBootstrapServers) {
        return new KafkaProducer<>(
            ImmutableMap.of(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainerBootstrapServers,
                ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString(),
                ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true,
                ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString()
            ),
            new VoidSerializer(),
            new StringSerializer());
    }

Если я использую локальную Кафку, она хорошо работает.

Но если я использую Kafka TestContainers, он зависает на producer.initTransactions():

private static final String KAFKA_VERSION = "4.1.1";

@Rule
public KafkaContainer kafka = new KafkaContainer(KAFKA_VERSION)
    .withEmbeddedZookeeper();

Как настроить KafkaContainer для работы с транзакциями?

Ответы [ 2 ]

1 голос
/ 25 марта 2019

Попробуйте использовать Kafka для JUnit вместо тестовых контейнеров Kafka.У меня была та же проблема с транзакциями, и я сделал их живыми таким образом.

Зависимость Maven, которую я использовал:

<dependency>
    <groupId>net.mguenther.kafka</groupId>
    <artifactId>kafka-junit</artifactId>
    <version>2.1.0</version>
    <scope>test</scope>
</dependency>
0 голосов
/ 26 марта 2019

Я получил исключение, используя Кафку для JUnit , как предложил @AntonLitvinenko.Мой вопрос об этом здесь .

Я добавил эту зависимость, чтобы исправить ее (см. проблема ):

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-test</artifactId>
    <version>2.12.0</version>
    <exclusions>
        <exclusion>
           <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
    <scope>test</scope>
</dependency>

Также я использовал2.0.1 версия для kafka-junit и kafka_2.11:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>${kafkaVersion}</version>
    <scope>test</scope>
</dependency>
...