Быстрое объединение: префикс (beforeCompletionFrom) `? - PullRequest
0 голосов
/ 07 ноября 2019

RxJava имеет оператор takeUntil, документация описывает его как:

отбрасывает любые объекты, испускаемые наблюдаемым после того, как второе наблюдаемое испускает элемент или завершает

Последняя часть - это то, чего я хочу достичь, используя Combine. Но я не нашел ни одного эквивалентного оператора. Единственный похожий оператор, который я нахожу, это prefix: untilOutputFrom, документация:

Повторная публикация элементов до тех пор, пока другой издатель не выпустит элемент.

Итак, учитывая:

fooPublisher.prefix(untilOutputFrom: barPublisher)

Не действует так, как я хочу, потому что заканчивается только тогда, когда barPublisher испускает элемент. Но я бы хотел, чтобы какой-нибудь оператор завершил работу после barPublisher.

Я что-то здесь упустил? Существует ли оператор, которого я хочу, под другим именем?

1 Ответ

0 голосов
/ 07 ноября 2019

Я сам реализовал этот оператор. Фактически я создал пять операторов, все из которых основаны на одной и той же общей (internal) функции. Я добавил несколько юнит-тестов для них, и они, кажется, работают нормально.

Пожалуйста, дайте мне знать, если вы найдете какие-либо ошибки / возможности для улучшения или лучшего решения вообще

Использование

// finish when `barPublisher` completes with `.finish`
fooPublisher.prefix(untilFinishFrom: barPublisher)

// finish when `barPublisher` completes with `.output` OR `.finish`
fooPublisher.prefix(untilOutputOrFinishFrom: barPublisher)

// finish when `barPublisher` completes either with `.finish` OR `.failure`
fooPublisher.prefix(untilCompletionFrom: barPublisher)

// finish when `barPublisher` completes either with `.output` OR `.finish` OR `.failure`
fooPublisher.prefix(untilCompletionFrom: barPublisher)

// finish when `barPublisher` completes with `.failure` 
// (I'm not so sure how useful this is... might be better to handle with an of
// the operators working with errors)
fooPublisher.prefix(untilFailureFrom: barPublisher)

Решение

Реализация

internal extension Publisher {
    func prefix<CompletionTrigger>(
        untilEventFrom completionTriggeringPublisher: CompletionTrigger,
        completionTriggerOptions: Publishers.CompletionTriggerOptions
    ) -> AnyPublisher<Output, Failure> where CompletionTrigger: Publisher {

        guard completionTriggerOptions != .output else {
            // Fallback to Combine's bundled operator
            return self.prefix(untilOutputFrom: completionTriggeringPublisher).eraseToAnyPublisher()
        }

        let completionAsOutputSubject = PassthroughSubject<Void, Never>()

        var cancellable: Cancellable? = completionTriggeringPublisher
            .sink(
                receiveCompletion: { completion in
                    switch completion {
                    case .failure:
                        guard completionTriggerOptions.contains(.failure) else { return }
                        completionAsOutputSubject.send()
                    case .finished:
                        guard completionTriggerOptions.contains(.finish) else { return }
                        completionAsOutputSubject.send()
                    }
                },
                receiveValue: { _ in
                    guard completionTriggerOptions.contains(.output) else { return }
                    completionAsOutputSubject.send()
            }
        )

        func cleanUp() {
            cancellable = nil
        }

        return self.prefix(untilOutputFrom: completionAsOutputSubject)
            .handleEvents(
                receiveCompletion: { _ in cleanUp() },
                receiveCancel: {
                    cancellable?.cancel()
                    cleanUp()
            }
        )
            .eraseToAnyPublisher()

    }
}

Помощники

// MARK: Publishers + CompletionTriggerOptions
public extension Publishers {
    struct CompletionTriggerOptions: OptionSet {
        public let rawValue: Int
        public init(rawValue: Int) {
            self.rawValue = rawValue
        }
    }
}

public extension Publishers.CompletionTriggerOptions {
    static let output   = Self(rawValue: 1 << 0)
    static let finish   = Self(rawValue: 1 << 1)
    static let failure  = Self(rawValue: 1 << 2)

    static let completion: Self =  [.finish, .failure]
    static let all: Self =  [.output, .finish, .failure]
}

Операторы

public extension Publisher {

    func prefix<CompletionTrigger>(
        untilCompletionFrom completionTriggeringPublisher: CompletionTrigger
    ) -> AnyPublisher<Output, Failure>
        where CompletionTrigger: Publisher
    {
        prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: .completion)
    }

    func prefix<CompletionTrigger>(
        untilFinishFrom completionTriggeringPublisher: CompletionTrigger
    ) -> AnyPublisher<Output, Failure>
        where CompletionTrigger: Publisher
    {
        prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: .finish)
    }

    func prefix<CompletionTrigger>(
        untilFailureFrom completionTriggeringPublisher: CompletionTrigger
    ) -> AnyPublisher<Output, Failure>
        where CompletionTrigger: Publisher
    {
        prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: .failure)
    }

    func prefix<CompletionTrigger>(
        untilOutputOrFinishFrom completionTriggeringPublisher: CompletionTrigger
    ) -> AnyPublisher<Output, Failure>
        where CompletionTrigger: Publisher
    {
        prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: [.output, .finish])
    }

    ///
    func prefix<CompletionTrigger>(
        untilOutputOrCompletionFrom completionTriggeringPublisher: CompletionTrigger
    ) -> AnyPublisher<Output, Failure>
        where CompletionTrigger: Publisher
    {
        prefix(untilEventFrom: completionTriggeringPublisher, completionTriggerOptions: [.output, .completion])
    }
}

Модульные тесты


import Foundation
import XCTest
import Combine

final class PrefixUntilCompletionFromTests: TestCase {

    // MARK: Combine's bundled
    func test_that_publisher___prefix_untilOutputFrom___completes_when_received_output() {

        let finishTriggeringSubject = PassthroughSubject<Void, Never>()

        doTestPublisherCompletes(
            triggerFinish: {
                finishTriggeringSubject.send()
        }
        ) {
            return $0.merge(with: $1).prefix(untilOutputFrom: finishTriggeringSubject).eraseToAnyPublisher()
        }

    }

    // MARK: Custom `prefix(until*`

    // MARK: `prefix:untilCompletionFrom`
    func test_that_publisher___prefix_untilCompletionFrom___completes_when_received_finish() {

        let finishTriggeringSubject = PassthroughSubject<Int, Never>()

        doTestPublisherCompletes(
            triggerFinish: {
                finishTriggeringSubject.send(completion: .finished)
        }
        ) {
            $0.merge(with: $1).prefix(untilCompletionFrom: finishTriggeringSubject)
        }
    }

    // MARK: `prefix:untilOutputOrFinishFrom`
    func test_that_publisher___prefix_untilOutputOrFinishFrom___completes_when_received_finish() {

        let finishTriggeringSubject = PassthroughSubject<Int, Never>()

        doTestPublisherCompletes(
            triggerFinish: {
                finishTriggeringSubject.send(completion: .finished)
        }
        ) {
            $0.merge(with: $1).prefix(untilOutputOrFinishFrom: finishTriggeringSubject)
        }
    }


    func test_that_publisher___prefix_untilOutputOrFinishFrom___completes_when_received_output() {

        let finishTriggeringSubject = PassthroughSubject<Void, Never>()

        doTestPublisherCompletes(
            triggerFinish: {
                finishTriggeringSubject.send()
        }
        ) {
            $0.merge(with: $1).prefix(untilOutputOrFinishFrom: finishTriggeringSubject)
        }
    }

    // MARK: `prefix:untilOutputOrCompletionFrom`
    func test_that_publisher___prefix_untilOutputOrCompletionFrom___completes_when_received_finish() {

        let finishTriggeringSubject = PassthroughSubject<Int, Never>()

        doTestPublisherCompletes(
            triggerFinish: {
                finishTriggeringSubject.send(completion: .finished)
        }
        ) {
            $0.merge(with: $1).prefix(untilOutputOrCompletionFrom: finishTriggeringSubject)
        }
    }


    func test_that_publisher___prefix_untilOutputOrCompletionFrom___completes_when_received_output() {

        let finishTriggeringSubject = PassthroughSubject<Void, Never>()

        doTestPublisherCompletes(
            triggerFinish: {
                finishTriggeringSubject.send()
        }
        ) {
            $0.merge(with: $1).prefix(untilOutputOrCompletionFrom: finishTriggeringSubject)
        }
    }

    func test_that_publisher___prefix_untilOutputOrCompletionFrom___completes_when_received_failure() {
        struct ErrorMarker: Swift.Error {}
        let finishTriggeringSubject = PassthroughSubject<Void, ErrorMarker>()

        doTestPublisherCompletes(
            triggerFinish: {
                finishTriggeringSubject.send(completion: .failure(ErrorMarker()))
        }
        ) {
            $0.merge(with: $1).prefix(untilOutputOrCompletionFrom: finishTriggeringSubject)
        }
    }

    // MARK: `prefix:untilFailureFrom`
    func test_that_publisher___prefix_untilFailureFrom___completes_when_received_output() {
        struct ErrorMarker: Swift.Error {}
        let finishTriggeringSubject = PassthroughSubject<Void, ErrorMarker>()

        doTestPublisherCompletes(
            triggerFinish: {
                finishTriggeringSubject.send(completion: .failure(ErrorMarker()))
        }
        ) {
            $0.merge(with: $1).prefix(untilFailureFrom: finishTriggeringSubject)
        }
    }

    // MARK: `prefix:untilEventFrom`
    func test_that_publisher___prefix_untilEventFrom___outut_completes_when_received_output() {

        let finishTriggeringSubject = PassthroughSubject<Void, Never>()

        doTestPublisherCompletes(
            triggerFinish: {
                finishTriggeringSubject.send()
        }
        ) {
            $0.merge(with: $1).prefix(untilEventFrom: finishTriggeringSubject, completionTriggerOptions: [.output])
        }
    }

    func doTestPublisherCompletes(
        _ line: UInt = #line,

        triggerFinish: () -> Void,

        makePublisherToTest: (
        _ first: AnyPublisher<Int, Never>,
        _ second: AnyPublisher<Int, Never>
        ) -> AnyPublisher<Int, Never>
    ) {

        let first = PassthroughSubject<Int, Never>()
        let second = PassthroughSubject<Int, Never>()

        let publisherToTest = makePublisherToTest(
            first.eraseToAnyPublisher(),
            second.eraseToAnyPublisher()
        )

        var returnValues = [Int]()
        let expectation = XCTestExpectation(description: self.debugDescription)

        let cancellable = publisherToTest
            .sink(
                receiveCompletion: { _ in expectation.fulfill() },
                receiveValue: { returnValues.append($0) }
        )

        first.send(1)
        first.send(2)
        first.send(completion: .finished)
        first.send(3)
        second.send(4)
        triggerFinish()
        second.send(5)

        wait(for: [expectation], timeout: 0.1)

        // output `3` sent by subject `first` is ignored, since it's sent after it has completed.
        // output `5` sent by subject `second` is ignored since it's sent after our `publisherToTest` has completed
        XCTAssertEqual(returnValues, [1, 2, 4], line: line)

        XCTAssertNotNil(cancellable, line: line)
    }


}

...