Spring Data Stream OutOfMemoryError - PullRequest
       10

Spring Data Stream OutOfMemoryError

0 голосов
/ 09 января 2019

Я пытаюсь транслировать таблицу с 200-метровыми записями. Я получаю около 70 тыс., А затем получаю ошибку OutOfMemoryError. Вот мой код:

@Service
@Slf4j
public class Migrator  {

    @Autowired
    LegacyDataRepository legacyDataRepository;

    @Autowired
    Producer producer;

    @Transactional
    public void run() {

        AtomicInteger counter = new AtomicInteger(0);

        try (Stream<Data> stream = legacyDataRepository.readAll()) {

            stream.forEach(x -> {

                try {
                    // the producer converts the data to json and writes to kafka
                    producer.send(data);

                    int currentCount = counter.incrementAndGet();

                    if (currentCount % 1000 == 0)
                        log.info("Migrated {} rows", currentCount);

                } catch (JsonProcessingException e) {
                     log.error("Failed to send event", e);
                }
            });
        }
    }
}

А вот мой репозиторий:

@Repository
@Slf4j
public class LegacyDataRepository {

    @PersistenceContext
    private EntityManager em;

    public Stream<Data> readThreats() {
        return em.createNativeQuery(query, Data.class)
                .setHint(QueryHints.HINT_READONLY, true)
                .setHint(QueryHints.HINT_FETCH_SIZE, 50 )
                // .setFirstResult(0)
                // .setMaxResults(1000)
                .getResultStream();
    }
}

Из других примеров, которые я обнаружил, видно, как транслировать всю большую таблицу.

EDIT1: Я подозреваю, что @Transactional держит все записи, читаемые в потоке. Я добавил ссылку на EntityManager и добавил em.clear();, которая контролирует память. Я действительно хотел бы решить эту проблему лучше.

Как я могу прочитать поток без удержания сущностей в менеджере сущностей?

...