У меня две подписки Rx Java, которые связаны вместе, поскольку вторая подписка зависит от результатов первой подписки. Bugsnag сообщает об ошибке ConcurrencyModification sh в одной из функций расширения массива Kotlin. Я знаю, что это основная c ошибка итерации массива .next (). Поскольку это встроенная функция, я не могу сказать, какая функция вызывает проблему в моем коде, поскольку эта информация неправильно передается в Bugsnag, и я не смог воспроизвести, что позволило бы мне локально увидеть, какая строка является проблемой. Я пробовал несколько вещей, предложенных в Интернете, но безрезультатно.
Обе подписки являются результатом Observables.combineLatest()
. Первая подписка полагается на две внутренние наблюдаемые и одну внутреннюю BehaviorSubject
, которая используется для получения значения из намерения фрагмента, все эти logi c лежат в ViewModel
.
Internal BehaviorSubject onNext()
, recommendedCourseFromPlacementTest(courseLevel: Int)
вызывается из фрагмента:
fun recommendedCourseFromPlacementTest(courseLevel: Int?) {
schedulerProvider.computation().scheduleDirect {
internalSuggestedCourseLevel.onNext(courseLevel ?: 0)
}
Тема internalSuggestedCourseLevel
объединяется с двумя упомянутыми наблюдаемыми объектами из серверной части этой подписки :
Observables.combineLatest(repo.getAllCourseImages(), stateRepo.observeContentState(), **internalSuggestedCourseLevel**)
.subscribeOn(schedulerProvider.computation())
.subscribe { triple ->
synchronized(triple) {
val images = triple.first
val contentState = triple.second
var suggestedCourseLevel = triple.third
val simpleCourseStatusList = contentState.zip(images) { state: CourseState, courseImages: CourseProgressImages ->
SimpleCourseStatus(id = state.id,
backgroundImageUrl = if (state.hasProgress) courseImages.inProgressImage else courseImages.noProgressImage,
)
}
val showSuggestedFromPlacement = simpleCourseStatusList.count { it.hasProgress } == 0
if (showSuggestedFromPlacement && suggestedCourseLevel > 0) {
simpleCourseStatusList.find { it.level == suggestedCourseLevel }?.apply {
showOutline = true
}
sendAdapterPosition(suggestedCourseLevel)
} else {
val shouldShowNextCourseFromProgress = simpleCourseStatusList.find { it.hasProgress && !it.isCompleted } == null
if (shouldShowNextCourseFromProgress) {
val lastCompletedCourseLevel = simpleCourseStatusList.findLast { it.isCompleted }?.level ?: 0
simpleCourseStatusList.find { !it.hasProgress && it.level > lastCompletedCourseLevel }?.apply {
showOutline = true
suggestedCourseLevel = this.level
}
}
}
internalSimpleCourseStatus.onNext(simpleCourseStatusList)
}
}
}
logi c в этой подписке создает список, который создается internalSimpleCourseStatus
. internalSimpleCourseStatus
- одна из наблюдаемых, используемых для второй подписки. Вот почему я называю их «прикованными». Вторая подписка объединяет internalSimpleCourseStatus
с двумя другими наблюдаемыми (аналогично первой подписке).:
Observables.combineLatest(**internalSimpleCourseStatus**,
repo.getAllSimpleCourseContent(), **subscriptionRepo.isContentUnlocked())**
.subscribeOn(schedulerProvider.computation())
.subscribe { triple ->
synchronized(triple) {
triple.hashCode()
val courseStatus = triple.first
val courseContent = triple.second
val contentUnlocked = triple.third
val zippedContentAndStatus = courseStatus.zip(courseContent) { status, content ->
SimpleCourse(id = content.id,
index = content.index,
name = content.name,
level = content.level,
numOfCompositions = content.numOfCompositions,
numOfArtists = content.numOfArtists,
outlineColorString = content.outlineColorString,
headerColorString = content.headerColorString,
levelDescription = if (!status.hasProgress) content.levelDescription else "",
isLocked = content.isLockable && !contentUnlocked,
compositionsCompleted = status.compositionsDone,
placeHolderResId = if(status.hasProgress) R.drawable.courses_error_inprogress else R.drawable.courses_error_noprogress_nofocus
)
}
allCourseDataLiveData.postValue(zippedContentAndStatus)
}
}
Эти точные фрагменты кода работали отлично, пока я не добавил функцию recommendedCourseFromPlacementTest
, которая передает в internalSuggestedCourseLevel
тема. Сообщение об исключении из Bugsnag:
Caused By: java.util.ConcurrentModificationException ·
LevelsViewModel.kt:167com.trala.learn.violin.viewmodel.content.LevelsViewModel$startObservingForComponentId$$inlined$let$lambda$1.accept
LevelsViewModel.kt:21com.trala.learn.violin.viewmodel.content.LevelsViewModel$startObservingForComponentId$$inlined$let$lambda$1.accept
LevelsViewModel.kt:131com.trala.learn.violin.viewmodel.content.LevelsViewModel.recommendedCourseFromPlacementTest
LevelsFragment.kt:104com.trala.learn.violin.view.content.LevelsFragment.setRecommendedCourse
LevelsFragment.kt:95com.trala.learn.violin.view.content.LevelsFragment.onViewCreated
В других случаях в исключении упоминается наблюдаемое из subscriptionRepo.isContentUnlocked()
, а испускаемое internalSuggestedCourseLevel
вызывает быстрое срабатывание второй подписки два раза подряд, что вызывает исключение ConcurrentModification Exception (или я так думаю, что это причина).
Я пробовал следующее, чтобы исправить эту проблему:
- Убедился, что все отправлено / подписано в одном потоке
schedulerProvider.computation()
. - Убедитесь, что BehaviorSubjects сериализованы при инициализации, ie
private var internalSuggestedCourseLevel = BehaviorSubject.create<Int>().toSerialized()
. - Пробовал синхронизировать logi c в подписках. Вот синхронизирую с
synchronized(triple)
. Я даже не знаю, возможно ли это. Я пробовал синхронизировать другие объекты с таким же результатом.
Есть ли способ синхронизировать код? Я не делаю никаких удалений или добавлений List
, поэтому я даже не понимаю, как / почему выбрасывается ConcurrentModificationException
. Я также не получаю никаких исключений противодавления, чтобы сказать, что мой потребитель не обрабатывает выбросы достаточно быстро.