У меня есть производитель, который отправляет какой-то объект в Kafka (Spring Boot Application). У меня есть потребитель с настроенным neo4j (я использую Spring-Data с Spring Boot)
Код слушателя:
@Component
@Slf4j
public class KafkaReceiver {
@Autowired
AssetService assetService;
@Transactional
@KafkaListener(topics = "portfolio")
public void receive(ConsumerRecord<?,?> consumerRecord) throws Exception{
log.debug("################## START RECEIVE ");
String value = consumerRecord.value().toString();
ObjectMapper objectMapper = new ObjectMapper();
log.debug("@@@@@@@ OBJECTS BEFORE : " + assetService.count());
Asset asset = objectMapper.readValue(value, Asset.class);
Asset s = assetService.addAsset(asset);
log.debug("@@@@@@@ OBJECTS AFTER : " + assetService.count());
log.debug("################## END RECEIVE ");
}
}
и код AssetService:
@Service
public class AssetService {
@Autowired
AssetRepository assetRepository;
@Autowired
Session neo4jSession;
@Autowired
private SessionFactory sessionFactory;
@Transactional(isolation = Isolation.READ_COMMITTED)
public Asset addAsset(Asset asset){
Asset s = assetRepository.save(asset);
return s;
}
@Transactional(readOnly = true)
public long count(){
return assetRepository.count();
}
}
(Полный код также доступен на github: https://github.com/jonleb/assetmanager)
Когда я запускаю приложение,В первый раз, когда слушатель использует сервис, данные сохраняются в neo4j, но потом это не так. Мне нужно перезапустить приложение, чтобы снова сохранить объект. Я не понимаю, почему. Я пытаюсь изменить транзакцию, но после 1день исследований У меня нет никакой новой идеи.
Не могли бы вы мне помочь?
Спасибо