Освобождение ресурсов, связанных с предметами - PullRequest
0 голосов
/ 08 июля 2019

Я реализую некоторую многоступенчатую конвейерную обработку с помощью RxJava3. Я использую несколько последовательных .concatMap вызовов для отдельных шагов. Как побочный эффект, шаги создают некоторые (большие) временные файлы, которые должны быть удалены как при ошибке, так и при успехе. Первый шаг передает файлы следующему. Я успешно использую Single.using, чтобы закрыть дескрипторы файлов, но не могу удалить файл таким образом, поскольку он исчезнет до того, как следующий шаг сможет его использовать. Удаление файла в doOnError на первом шаге и в using на втором шаге работает в большинстве случаев.

Однако существует угловой случай, когда файл «протекает», т.е. не удаляется: если второй шаг первого рабочего элемента завершается неудачно (выдает исключение) после , второй элемент завершил свой первый (первый concatMap) шаг, но еще не начатый, его второй шаг (второй concatMap), этот второй элемент находится в некотором промежуточном месте и не удаляется, так как он в настоящее время не захвачен ни в одном using scope.

Мой минимальный пример:

import io.reactivex.Observable
import io.reactivex.schedulers.Schedulers
import java.io.File
import io.reactivex.Single
import java.io.FileOutputStream

fun main(args: Array<String>) {
    Observable.just(5, 4).subscribeOn(Schedulers.computation())
        .concatMapSingle { workItem ->
            val file = File("/tmp/foo/work$workItem")
            Single.using({ FileOutputStream(file) }, { oStream ->
                Single.just(oStream)
                    .subscribeOn(Schedulers.computation())
                    .map { os ->
                        println("Pretending to work on item $workItem")
                        os.write("File $workItem".toByteArray())

//                      if (Math.random () > 0.5) throw Exception ("Something else failed")

                        Thread.sleep(workItem.toLong() * 1000)  // Second work item should have a quicker first step than the second step of the first item
                        Pair(file, workItem)    // Propagate both file and item number
                    }.doOnError { println("Deleting ${file.absolutePath}"); file.delete() }
            }, { os -> println("Closing file ${file.absolutePath}"); os.close(); })
        }
        .concatMapSingle { data1 ->
            Single.using({ data1 }, { data2 ->
                Single.just(data2)
                    .subscribeOn(Schedulers.computation())
                    .map { data ->
                        val workItem = data.second
                        println("Continuing pretend work on item ${workItem}");

                        Thread.sleep(workItem.toLong() * 1000)

                        // ... More complex stuff here ...

                        if (workItem == 5) throw Exception("Something failed")
                    }
            }, { data -> println("Deleting ${data.first.absolutePath}"); data.first.delete(); })
        }.blockingSubscribe();
}

Если выброшено исключение, файл /tmp/foo/work4 не удаляется, так как рабочий элемент "4" ожидает обработки 1сек вторым concatMap. Выход:

Pretending to work on item 5
Closing file /tmp/foo/work5
Continuing pretend work on item 5
Pretending to work on item 4
Closing file /tmp/foo/work4
Deleting /tmp/foo/work5
Exception in thread "main" java.lang.RuntimeException: java.lang.Exception: Something failed
    at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:46)
    [...]

Если выдается первое (закомментированное) исключение или исключение, все удаляется нормально. Проблема такая же с flatMap, но сложнее отладить, так как параллельно выполняется больше вещей.

Поэтому мой вопрос: могу ли я связать некоторую функцию «очистки» с элементами (здесь: 5, 4), которая всегда вызывается, когда этот элемент выходит из области видимости?

...