Я пытаюсь преобразовать и сбросить некоторые данные в базу данных в потоковом приложении, используя spring и kotlin. ограничения производительности заставили меня использовать сопрограммы и чанк для более быстрого выполнения преобразования и сохранения.
Проблема в том, что размер входных данных не равен постоянным данным!
Моя работа Запланировано на выполнение с фиксированной задержкой:
@Scheduled(fixedDelay = 10_000)
fun flushToDb() {
// some operations
CoroutineScope(getDispatcherExec()).launch {
flush(oldStat, isFlushing)
}
}
и это моя функция приостановки:
private suspend fun flush(oldStat: Map<String, AppDailyStatsModel>, isFlushing: AtomicBoolean) {
logger.info("RedisFlush ${oldStat.size} starts")
val start = System.currentTimeMillis()
val count = AtomicLong(0)
val startCount = AtomicLong(0)
val newStat = AtomicLong(0)
val updateStat = AtomicLong(0)
val input = oldStat.values
coroutineScope {
input
.chunked(1000)
.forEach { models ->
launch {
try {
startCount.addAndGet(models.size.toLong())
// Transform
// Persist
count.addAndGet(models.size.toLong())
logger.info("Flushed ${count.get()}")
} catch (e: Exception) {
logger.error("PROBLEM IN SAVE ", e)
}
}
}
}
}
Проблема в том, что count
и startCount
равны, и они меньше input
!!
любая помощь будет высоко ценится!