Сохраненная сущность в слушателе Spring Kafka не откатывается при транзакционном тесте - PullRequest
0 голосов
/ 01 января 2019

У меня есть @KafkaListener, который сохраняет новую сущность, используя JDBC-репозиторий Spring Data.Сохраненный объект не откатывается в тесте, помеченном @Transactional

@Service
public class KafkaConsumer {

    private final EntityRepository entityRepository;

    private CountDownLatch countDownLatch;

    public KafkaConsumer(EntityRepository entityRepository) {
        this.entityRepository = entityRepository;
    }

    @KafkaListener(topics = "topic", groupId = "group-id")
    public void consume(String id) {
        Entity entity = new Entity();
        entity.setId(id);
        entity.setNew(true);

        entityRepository.save(entity);

        if (countDownLatch != null) countDownLatch.countDown();
    }

    public void setCountDownLatch(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }
}

@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
public class SpringKafkaDataJdbcMysqlTransactionalTestApplicationTests {

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, 1, "topic");

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Autowired
    private KafkaConsumer kafkaConsumer;

    @Autowired
    private EntityRepository entityRepository;

    @BeforeClass
    public static void setup() {
        System.setProperty("spring.kafka.bootstrap-servers", embeddedKafkaRule.getEmbeddedKafka().getBrokersAsString());
    }

    @Test
    public void test() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);

        kafkaConsumer.setCountDownLatch(countDownLatch);

        String id = UUID.randomUUID().toString();

        kafkaTemplate.send("topic", id);

        assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue();

        assertThat(entityRepository.findById(id)).isNotEmpty();
    }

}

Я выполняю это с использованием базы данных MySQL и ожидаю, что сохраненный объект будет откатан.Похоже, что транзакция, запущенная репозиторием, не присоединяется к транзакции, запущенной транзакцией в тесте, потому что это транзакция в отдельном потоке.

Как присоединиться к транзакции и откатить сущность?Кроме того, это хороший способ проверить это поведение?

Я повторил это здесь: https://github.com/yraydhitya/spring-kafka-data-jdbc-mysql-transactional-test

...