У меня есть @KafkaListener
, который сохраняет новую сущность, используя JDBC-репозиторий Spring Data.Сохраненный объект не откатывается в тесте, помеченном @Transactional
@Service
public class KafkaConsumer {
private final EntityRepository entityRepository;
private CountDownLatch countDownLatch;
public KafkaConsumer(EntityRepository entityRepository) {
this.entityRepository = entityRepository;
}
@KafkaListener(topics = "topic", groupId = "group-id")
public void consume(String id) {
Entity entity = new Entity();
entity.setId(id);
entity.setNew(true);
entityRepository.save(entity);
if (countDownLatch != null) countDownLatch.countDown();
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
}
@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
public class SpringKafkaDataJdbcMysqlTransactionalTestApplicationTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, 1, "topic");
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private KafkaConsumer kafkaConsumer;
@Autowired
private EntityRepository entityRepository;
@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafkaRule.getEmbeddedKafka().getBrokersAsString());
}
@Test
public void test() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
kafkaConsumer.setCountDownLatch(countDownLatch);
String id = UUID.randomUUID().toString();
kafkaTemplate.send("topic", id);
assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(entityRepository.findById(id)).isNotEmpty();
}
}
Я выполняю это с использованием базы данных MySQL и ожидаю, что сохраненный объект будет откатан.Похоже, что транзакция, запущенная репозиторием, не присоединяется к транзакции, запущенной транзакцией в тесте, потому что это транзакция в отдельном потоке.
Как присоединиться к транзакции и откатить сущность?Кроме того, это хороший способ проверить это поведение?
Я повторил это здесь: https://github.com/yraydhitya/spring-kafka-data-jdbc-mysql-transactional-test