Я пытаюсь транслировать таблицу с 200-метровыми записями. Я получаю около 70 тыс., А затем получаю ошибку OutOfMemoryError. Вот мой код:
@Service
@Slf4j
public class Migrator {
@Autowired
LegacyDataRepository legacyDataRepository;
@Autowired
Producer producer;
@Transactional
public void run() {
AtomicInteger counter = new AtomicInteger(0);
try (Stream<Data> stream = legacyDataRepository.readAll()) {
stream.forEach(x -> {
try {
// the producer converts the data to json and writes to kafka
producer.send(data);
int currentCount = counter.incrementAndGet();
if (currentCount % 1000 == 0)
log.info("Migrated {} rows", currentCount);
} catch (JsonProcessingException e) {
log.error("Failed to send event", e);
}
});
}
}
}
А вот мой репозиторий:
@Repository
@Slf4j
public class LegacyDataRepository {
@PersistenceContext
private EntityManager em;
public Stream<Data> readThreats() {
return em.createNativeQuery(query, Data.class)
.setHint(QueryHints.HINT_READONLY, true)
.setHint(QueryHints.HINT_FETCH_SIZE, 50 )
// .setFirstResult(0)
// .setMaxResults(1000)
.getResultStream();
}
}
Из других примеров, которые я обнаружил, видно, как транслировать всю большую таблицу.
EDIT1:
Я подозреваю, что @Transactional
держит все записи, читаемые в потоке. Я добавил ссылку на EntityManager
и добавил em.clear();
, которая контролирует память. Я действительно хотел бы решить эту проблему лучше.
Как я могу прочитать поток без удержания сущностей в менеджере сущностей?