У меня есть объект PassthroughSubject, который отправляет 30 целых чисел с последующим сообщением о завершении.
Получив эти числа от субъекта, я порождаю будущее, которое спит одну секунду и завершается вводом номера * 2.
Я использую .receiveOn, чтобы убедиться, что фьючерс работаетодновременно, но это означает, что сообщение о завершении также одновременно распространяется по цепочке и завершает приемник до того, как все фьючерсы будут завершены.
Любые мастера RxSwift / Combine там знают, как я могу сделать так, чтобы прием сообщения о завершении задерживался завершением будущего?
Вот игровая площадка, которая реализует описанное поведение:
import Foundation
import Combine
import PlaygroundSupport
/// Setting up the playground
PlaygroundPage.current.needsIndefiniteExecution = true
/// Injects numbers 0-30 into combine message stream, and then sends a finish.
func publishNumbers(to subject: PassthroughSubject<Int, Error>) {
(0..<30).forEach {
subject.send($0)
}
subject.send(completion: .finished)
}
/// Delays for one secont, and completes the future by doubling the input.
func delayAndDoubleNumber(_ int: Int) -> Future<Int, Error> {
return Future<Int, Error> { complete in
sleep(1)
complete(.success(int * 2))
}
}
// Properties involved in Combine processing chain.
let numbersSubject = PassthroughSubject<Int, Error>()
let processingQueue = DispatchQueue.global(qos: .userInitiated)
// Combine processing chain
numbersSubject
.receive(on: processingQueue) //Comment this line to observe that all futures finish, and are collected before the finish message kills the sink.
.flatMap { number in
return delayAndDoubleNumber(number)
}
.collect(4)
.sink(receiveCompletion: { completion in
print("Complete: \(completion)")
}, receiveValue: { value in
print("Received Value: \(value)")
})
publishNumbers(to: numbersSubject)