Объект Neo4J не сохраняется при втором сохранении с помощью KafkaListener - PullRequest
0 голосов
/ 15 мая 2018

У меня есть производитель, который отправляет какой-то объект в Kafka (Spring Boot Application). У меня есть потребитель с настроенным neo4j (я использую Spring-Data с Spring Boot)

Код слушателя:

@Component
@Slf4j
public class KafkaReceiver {

  @Autowired
  AssetService assetService;

  @Transactional
  @KafkaListener(topics = "portfolio")
  public void receive(ConsumerRecord<?,?> consumerRecord) throws Exception{
    log.debug("################## START RECEIVE ");
    String value = consumerRecord.value().toString();
    ObjectMapper objectMapper = new ObjectMapper();
    log.debug("@@@@@@@ OBJECTS BEFORE : " + assetService.count());
    Asset asset = objectMapper.readValue(value, Asset.class);
    Asset s = assetService.addAsset(asset);
    log.debug("@@@@@@@ OBJECTS AFTER : " + assetService.count());

    log.debug("################## END RECEIVE ");
  }
 }

и код AssetService:

   @Service
   public class AssetService {
      @Autowired
      AssetRepository assetRepository;

      @Autowired
      Session neo4jSession;

      @Autowired
      private SessionFactory sessionFactory;

      @Transactional(isolation = Isolation.READ_COMMITTED)
      public Asset addAsset(Asset asset){
        Asset s = assetRepository.save(asset);
        return s;
      }

      @Transactional(readOnly = true)
      public long count(){
        return assetRepository.count();
       }
   }

(Полный код также доступен на github: https://github.com/jonleb/assetmanager)

Когда я запускаю приложение,В первый раз, когда слушатель использует сервис, данные сохраняются в neo4j, но потом это не так. Мне нужно перезапустить приложение, чтобы снова сохранить объект. Я не понимаю, почему. Я пытаюсь изменить транзакцию, но после 1день исследований У меня нет никакой новой идеи.

Не могли бы вы мне помочь?

Спасибо

...