проблема выполнения сопрограммы некоторые сопрограммы не будут запускаться - PullRequest
3 голосов
/ 02 июня 2019

Я пытаюсь преобразовать и сбросить некоторые данные в базу данных в потоковом приложении, используя spring и kotlin. ограничения производительности заставили меня использовать сопрограммы и чанк для более быстрого выполнения преобразования и сохранения. Проблема в том, что размер входных данных не равен постоянным данным!

Моя работа Запланировано на выполнение с фиксированной задержкой:

@Scheduled(fixedDelay = 10_000)
fun flushToDb() {
    // some operations
    CoroutineScope(getDispatcherExec()).launch {
        flush(oldStat, isFlushing)
    }
}

и это моя функция приостановки:

private suspend fun flush(oldStat: Map<String, AppDailyStatsModel>, isFlushing: AtomicBoolean) {
    logger.info("RedisFlush ${oldStat.size} starts")
    val start = System.currentTimeMillis()
    val count = AtomicLong(0)
    val startCount = AtomicLong(0)
    val newStat = AtomicLong(0)
    val updateStat = AtomicLong(0)
    val input = oldStat.values
    coroutineScope {

        input
                .chunked(1000)
                .forEach { models ->
                    launch {
                        try {
            startCount.addAndGet(models.size.toLong())
                            // Transform

            // Persist
                            count.addAndGet(models.size.toLong())
                            logger.info("Flushed  ${count.get()}")
                        } catch (e: Exception) {
                            logger.error("PROBLEM IN SAVE ", e)
                        }
                    }
                }
    }
}

Проблема в том, что count и startCount равны, и они меньше input !!

любая помощь будет высоко ценится!

...