Я реализую некоторую многоступенчатую конвейерную обработку с помощью RxJava3. Я использую несколько последовательных .concatMap
вызовов для отдельных шагов. Как побочный эффект, шаги создают некоторые (большие) временные файлы, которые должны быть удалены как при ошибке, так и при успехе. Первый шаг передает файлы следующему. Я успешно использую Single.using
, чтобы закрыть дескрипторы файлов, но не могу удалить файл таким образом, поскольку он исчезнет до того, как следующий шаг сможет его использовать. Удаление файла в doOnError
на первом шаге и в using
на втором шаге работает в большинстве случаев.
Однако существует угловой случай, когда файл «протекает», т.е. не удаляется: если второй шаг первого рабочего элемента завершается неудачно (выдает исключение) после , второй элемент завершил свой первый (первый concatMap
) шаг, но еще не начатый, его второй шаг (второй concatMap
), этот второй элемент находится в некотором промежуточном месте и не удаляется, так как он в настоящее время не захвачен ни в одном using
scope.
Мой минимальный пример:
import io.reactivex.Observable
import io.reactivex.schedulers.Schedulers
import java.io.File
import io.reactivex.Single
import java.io.FileOutputStream
fun main(args: Array<String>) {
Observable.just(5, 4).subscribeOn(Schedulers.computation())
.concatMapSingle { workItem ->
val file = File("/tmp/foo/work$workItem")
Single.using({ FileOutputStream(file) }, { oStream ->
Single.just(oStream)
.subscribeOn(Schedulers.computation())
.map { os ->
println("Pretending to work on item $workItem")
os.write("File $workItem".toByteArray())
// if (Math.random () > 0.5) throw Exception ("Something else failed")
Thread.sleep(workItem.toLong() * 1000) // Second work item should have a quicker first step than the second step of the first item
Pair(file, workItem) // Propagate both file and item number
}.doOnError { println("Deleting ${file.absolutePath}"); file.delete() }
}, { os -> println("Closing file ${file.absolutePath}"); os.close(); })
}
.concatMapSingle { data1 ->
Single.using({ data1 }, { data2 ->
Single.just(data2)
.subscribeOn(Schedulers.computation())
.map { data ->
val workItem = data.second
println("Continuing pretend work on item ${workItem}");
Thread.sleep(workItem.toLong() * 1000)
// ... More complex stuff here ...
if (workItem == 5) throw Exception("Something failed")
}
}, { data -> println("Deleting ${data.first.absolutePath}"); data.first.delete(); })
}.blockingSubscribe();
}
Если выброшено исключение, файл /tmp/foo/work4
не удаляется, так как рабочий элемент "4" ожидает обработки 1сек вторым concatMap
. Выход:
Pretending to work on item 5
Closing file /tmp/foo/work5
Continuing pretend work on item 5
Pretending to work on item 4
Closing file /tmp/foo/work4
Deleting /tmp/foo/work5
Exception in thread "main" java.lang.RuntimeException: java.lang.Exception: Something failed
at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:46)
[...]
Если выдается первое (закомментированное) исключение или исключение, все удаляется нормально. Проблема такая же с flatMap
, но сложнее отладить, так как параллельно выполняется больше вещей.
Поэтому мой вопрос: могу ли я связать некоторую функцию «очистки» с элементами (здесь: 5, 4), которая всегда вызывается, когда этот элемент выходит из области видимости?