Android Исключение параллелизма в Rx Java Подписки - PullRequest
0 голосов
/ 19 июня 2020

У меня две подписки Rx Java, которые связаны вместе, поскольку вторая подписка зависит от результатов первой подписки. Bugsnag сообщает об ошибке ConcurrencyModification sh в одной из функций расширения массива Kotlin. Я знаю, что это основная c ошибка итерации массива .next (). Поскольку это встроенная функция, я не могу сказать, какая функция вызывает проблему в моем коде, поскольку эта информация неправильно передается в Bugsnag, и я не смог воспроизвести, что позволило бы мне локально увидеть, какая строка является проблемой. Я пробовал несколько вещей, предложенных в Интернете, но безрезультатно.

Обе подписки являются результатом Observables.combineLatest(). Первая подписка полагается на две внутренние наблюдаемые и одну внутреннюю BehaviorSubject, которая используется для получения значения из намерения фрагмента, все эти logi c лежат в ViewModel.

Internal BehaviorSubject onNext(), recommendedCourseFromPlacementTest(courseLevel: Int) вызывается из фрагмента:

 fun recommendedCourseFromPlacementTest(courseLevel: Int?) {

    schedulerProvider.computation().scheduleDirect {
        internalSuggestedCourseLevel.onNext(courseLevel ?: 0)
    }

Тема internalSuggestedCourseLevel объединяется с двумя упомянутыми наблюдаемыми объектами из серверной части этой подписки :

Observables.combineLatest(repo.getAllCourseImages(), stateRepo.observeContentState(), **internalSuggestedCourseLevel**)
                    .subscribeOn(schedulerProvider.computation())
                    .subscribe { triple ->

                synchronized(triple) {
                    val images = triple.first
                    val contentState = triple.second
                    var suggestedCourseLevel = triple.third


                    val simpleCourseStatusList = contentState.zip(images) { state: CourseState, courseImages: CourseProgressImages ->

                        SimpleCourseStatus(id = state.id,
                                backgroundImageUrl = if (state.hasProgress) courseImages.inProgressImage else courseImages.noProgressImage,

                        )

                    }

                    val showSuggestedFromPlacement = simpleCourseStatusList.count { it.hasProgress } == 0
                    if (showSuggestedFromPlacement && suggestedCourseLevel > 0) {
                        simpleCourseStatusList.find { it.level == suggestedCourseLevel }?.apply {
                            showOutline = true
                        }

                        sendAdapterPosition(suggestedCourseLevel)
                    } else {
                        val shouldShowNextCourseFromProgress = simpleCourseStatusList.find { it.hasProgress && !it.isCompleted } == null
                        if (shouldShowNextCourseFromProgress) {
                            val lastCompletedCourseLevel = simpleCourseStatusList.findLast { it.isCompleted }?.level ?: 0
                            simpleCourseStatusList.find { !it.hasProgress && it.level > lastCompletedCourseLevel }?.apply {
                                showOutline = true
                                suggestedCourseLevel = this.level
                            }
                        }
                    }

                    internalSimpleCourseStatus.onNext(simpleCourseStatusList)
                }

            }
}

logi c в этой подписке создает список, который создается internalSimpleCourseStatus. internalSimpleCourseStatus - одна из наблюдаемых, используемых для второй подписки. Вот почему я называю их «прикованными». Вторая подписка объединяет internalSimpleCourseStatus с двумя другими наблюдаемыми (аналогично первой подписке).:

Observables.combineLatest(**internalSimpleCourseStatus**,
                    repo.getAllSimpleCourseContent(), **subscriptionRepo.isContentUnlocked())**
                    .subscribeOn(schedulerProvider.computation())
                    .subscribe { triple ->

                synchronized(triple) {
                    triple.hashCode()
                    val courseStatus = triple.first
                    val courseContent = triple.second
                    val contentUnlocked = triple.third

                    val zippedContentAndStatus = courseStatus.zip(courseContent) { status, content ->
                        SimpleCourse(id = content.id,
                                index = content.index,
                                name = content.name,
                                level = content.level,
                                numOfCompositions = content.numOfCompositions,
                                numOfArtists = content.numOfArtists,
                                outlineColorString = content.outlineColorString,
                                headerColorString = content.headerColorString,
                                levelDescription = if (!status.hasProgress) content.levelDescription else "",
                                isLocked = content.isLockable && !contentUnlocked,                           
                                compositionsCompleted = status.compositionsDone,
                                placeHolderResId = if(status.hasProgress) R.drawable.courses_error_inprogress else R.drawable.courses_error_noprogress_nofocus

                        )
                    }

                    allCourseDataLiveData.postValue(zippedContentAndStatus)

                }

            }

Эти точные фрагменты кода работали отлично, пока я не добавил функцию recommendedCourseFromPlacementTest, которая передает в internalSuggestedCourseLevel тема. Сообщение об исключении из Bugsnag:

Caused By: java.util.ConcurrentModificationException ·
LevelsViewModel.kt:167com.trala.learn.violin.viewmodel.content.LevelsViewModel$startObservingForComponentId$$inlined$let$lambda$1.accept    
LevelsViewModel.kt:21com.trala.learn.violin.viewmodel.content.LevelsViewModel$startObservingForComponentId$$inlined$let$lambda$1.accept 
LevelsViewModel.kt:131com.trala.learn.violin.viewmodel.content.LevelsViewModel.recommendedCourseFromPlacementTest   
LevelsFragment.kt:104com.trala.learn.violin.view.content.LevelsFragment.setRecommendedCourse    
LevelsFragment.kt:95com.trala.learn.violin.view.content.LevelsFragment.onViewCreated

В других случаях в исключении упоминается наблюдаемое из subscriptionRepo.isContentUnlocked(), а испускаемое internalSuggestedCourseLevel вызывает быстрое срабатывание второй подписки два раза подряд, что вызывает исключение ConcurrentModification Exception (или я так думаю, что это причина).

Я пробовал следующее, чтобы исправить эту проблему:

  • Убедился, что все отправлено / подписано в одном потоке schedulerProvider.computation().
  • Убедитесь, что BehaviorSubjects сериализованы при инициализации, ie private var internalSuggestedCourseLevel = BehaviorSubject.create<Int>().toSerialized().
  • Пробовал синхронизировать logi c в подписках. Вот синхронизирую с synchronized(triple). Я даже не знаю, возможно ли это. Я пробовал синхронизировать другие объекты с таким же результатом.

Есть ли способ синхронизировать код? Я не делаю никаких удалений или добавлений List, поэтому я даже не понимаю, как / почему выбрасывается ConcurrentModificationException. Я также не получаю никаких исключений противодавления, чтобы сказать, что мой потребитель не обрабатывает выбросы достаточно быстро.

...