Я сам реализовал этот оператор. Фактически я создал пять операторов, все из которых основаны на одной и той же общей (internal
) функции. Я добавил несколько юнит-тестов для них, и они, кажется, работают нормально.
Пожалуйста, дайте мне знать, если вы найдете какие-либо ошибки / возможности для улучшения или лучшего решения вообще
Использование
// finish when `barPublisher` completes with `.finish`
fooPublisher.prefix(untilFinishFrom: barPublisher)
// finish when `barPublisher` completes with `.output` OR `.finish`
fooPublisher.prefix(untilOutputOrFinishFrom: barPublisher)
// finish when `barPublisher` completes either with `.finish` OR `.failure`
fooPublisher.prefix(untilCompletionFrom: barPublisher)
// finish when `barPublisher` completes either with `.output` OR `.finish` OR `.failure`
fooPublisher.prefix(untilCompletionFrom: barPublisher)
// finish when `barPublisher` completes with `.failure`
// (I'm not so sure how useful this is... might be better to handle with an of
// the operators working with errors)
fooPublisher.prefix(untilFailureFrom: barPublisher)
Решение
Реализация
internal extension Publisher {
func prefix<CompletionTrigger>(
untilEventFrom completionTriggeringPublisher: CompletionTrigger,
completionTriggerOptions: Publishers.CompletionTriggerOptions
) -> AnyPublisher<Output, Failure> where CompletionTrigger: Publisher {
guard completionTriggerOptions != .output else {
// Fallback to Combine's bundled operator
return self.prefix(untilOutputFrom: completionTriggeringPublisher).eraseToAnyPublisher()
}
let completionAsOutputSubject = PassthroughSubject<Void, Never>()
var cancellable: Cancellable? = completionTriggeringPublisher
.sink(
receiveCompletion: { completion in
switch completion {
case .failure:
guard completionTriggerOptions.contains(.failure) else { return }
completionAsOutputSubject.send()
case .finished:
guard completionTriggerOptions.contains(.finish) else { return }
completionAsOutputSubject.send()
}
},
receiveValue: { _ in
guard completionTriggerOptions.contains(.output) else { return }
completionAsOutputSubject.send()
}
)
func cleanUp() {
cancellable = nil
}
return self.prefix(untilOutputFrom: completionAsOutputSubject)
.handleEvents(
receiveCompletion: { _ in cleanUp() },
receiveCancel: {
cancellable?.cancel()
cleanUp()
}
)
.eraseToAnyPublisher()
}
}
Помощники
// MARK: Publishers + CompletionTriggerOptions
public extension Publishers {
struct CompletionTriggerOptions: OptionSet {
public let rawValue: Int
public init(rawValue: Int) {
self.rawValue = rawValue
}
}
}
public extension Publishers.CompletionTriggerOptions {
static let output = Self(rawValue: 1 << 0)
static let finish = Self(rawValue: 1 << 1)
static let failure = Self(rawValue: 1 << 2)
static let completion: Self = [.finish, .failure]
static let all: Self = [.output, .finish, .failure]
}
Операторы
public extension Publisher {
func prefix<CompletionTrigger>(
untilCompletionFrom completionTriggeringPublisher: CompletionTrigger
) -> AnyPublisher<Output, Failure>
where CompletionTrigger: Publisher
{
prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: .completion)
}
func prefix<CompletionTrigger>(
untilFinishFrom completionTriggeringPublisher: CompletionTrigger
) -> AnyPublisher<Output, Failure>
where CompletionTrigger: Publisher
{
prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: .finish)
}
func prefix<CompletionTrigger>(
untilFailureFrom completionTriggeringPublisher: CompletionTrigger
) -> AnyPublisher<Output, Failure>
where CompletionTrigger: Publisher
{
prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: .failure)
}
func prefix<CompletionTrigger>(
untilOutputOrFinishFrom completionTriggeringPublisher: CompletionTrigger
) -> AnyPublisher<Output, Failure>
where CompletionTrigger: Publisher
{
prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: [.output, .finish])
}
///
func prefix<CompletionTrigger>(
untilOutputOrCompletionFrom completionTriggeringPublisher: CompletionTrigger
) -> AnyPublisher<Output, Failure>
where CompletionTrigger: Publisher
{
prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: [.output, .completion])
}
}
Модульные тесты
import Foundation
import XCTest
import Combine
final class PrefixUntilCompletionFromTests: TestCase {
// MARK: Combine's bundled
func test_that_publisher___prefix_untilOutputFrom___completes_when_received_output() {
let finishTriggeringSubject = PassthroughSubject<Void, Never>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send()
}
) {
return $0.merge(with: $1).prefix(untilOutputFrom: finishTriggeringSubject).eraseToAnyPublisher()
}
}
// MARK: Custom `prefix(until*`
// MARK: `prefix:untilCompletionFrom`
func test_that_publisher___prefix_untilCompletionFrom___completes_when_received_finish() {
let finishTriggeringSubject = PassthroughSubject<Int, Never>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send(completion: .finished)
}
) {
$0.merge(with: $1).prefix(untilCompletionFrom: finishTriggeringSubject)
}
}
// MARK: `prefix:untilOutputOrFinishFrom`
func test_that_publisher___prefix_untilOutputOrFinishFrom___completes_when_received_finish() {
let finishTriggeringSubject = PassthroughSubject<Int, Never>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send(completion: .finished)
}
) {
$0.merge(with: $1).prefix(untilOutputOrFinishFrom: finishTriggeringSubject)
}
}
func test_that_publisher___prefix_untilOutputOrFinishFrom___completes_when_received_output() {
let finishTriggeringSubject = PassthroughSubject<Void, Never>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send()
}
) {
$0.merge(with: $1).prefix(untilOutputOrFinishFrom: finishTriggeringSubject)
}
}
// MARK: `prefix:untilOutputOrCompletionFrom`
func test_that_publisher___prefix_untilOutputOrCompletionFrom___completes_when_received_finish() {
let finishTriggeringSubject = PassthroughSubject<Int, Never>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send(completion: .finished)
}
) {
$0.merge(with: $1).prefix(untilOutputOrCompletionFrom: finishTriggeringSubject)
}
}
func test_that_publisher___prefix_untilOutputOrCompletionFrom___completes_when_received_output() {
let finishTriggeringSubject = PassthroughSubject<Void, Never>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send()
}
) {
$0.merge(with: $1).prefix(untilOutputOrCompletionFrom: finishTriggeringSubject)
}
}
func test_that_publisher___prefix_untilOutputOrCompletionFrom___completes_when_received_failure() {
struct ErrorMarker: Swift.Error {}
let finishTriggeringSubject = PassthroughSubject<Void, ErrorMarker>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send(completion: .failure(ErrorMarker()))
}
) {
$0.merge(with: $1).prefix(untilOutputOrCompletionFrom: finishTriggeringSubject)
}
}
// MARK: `prefix:untilFailureFrom`
func test_that_publisher___prefix_untilFailureFrom___completes_when_received_output() {
struct ErrorMarker: Swift.Error {}
let finishTriggeringSubject = PassthroughSubject<Void, ErrorMarker>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send(completion: .failure(ErrorMarker()))
}
) {
$0.merge(with: $1).prefix(untilFailureFrom: finishTriggeringSubject)
}
}
// MARK: `prefix:untilEventFrom`
func test_that_publisher___prefix_untilEventFrom___outut_completes_when_received_output() {
let finishTriggeringSubject = PassthroughSubject<Void, Never>()
doTestPublisherCompletes(
triggerFinish: {
finishTriggeringSubject.send()
}
) {
$0.merge(with: $1).prefix(untilEventFrom: finishTriggeringSubject, completionTriggerOptions: [.output])
}
}
func doTestPublisherCompletes(
_ line: UInt = #line,
triggerFinish: () -> Void,
makePublisherToTest: (
_ first: AnyPublisher<Int, Never>,
_ second: AnyPublisher<Int, Never>
) -> AnyPublisher<Int, Never>
) {
let first = PassthroughSubject<Int, Never>()
let second = PassthroughSubject<Int, Never>()
let publisherToTest = makePublisherToTest(
first.eraseToAnyPublisher(),
second.eraseToAnyPublisher()
)
var returnValues = [Int]()
let expectation = XCTestExpectation(description: self.debugDescription)
let cancellable = publisherToTest
.sink(
receiveCompletion: { _ in expectation.fulfill() },
receiveValue: { returnValues.append($0) }
)
first.send(1)
first.send(2)
first.send(completion: .finished)
first.send(3)
second.send(4)
triggerFinish()
second.send(5)
wait(for: [expectation], timeout: 0.1)
// output `3` sent by subject `first` is ignored, since it's sent after it has completed.
// output `5` sent by subject `second` is ignored since it's sent after our `publisherToTest` has completed
XCTAssertEqual(returnValues, [1, 2, 4], line: line)
XCTAssertNotNil(cancellable, line: line)
}
}