Swift Combine - Publishers.CombineПоследний в нескольких темах - PullRequest
1 голос
/ 04 марта 2020

При использовании Publishers.CombineLatest с издателями, которые запускают потоки, отличные от Main .sink для Publishers.CombineLatest не всегда вызывается.

Проблема возникает не каждый раз, поэтому я создал модульные тесты которые пробуют тест 100 раз подряд. Обычно они терпят неудачу после 4-5 итераций.

import XCTest
import Combine

class CombineLatestTests: XCTestCase {

    override func setUp() {
        continueAfterFailure = false
    }

    func testCombineLatest_receiveOn() {
        for x in 0...1000 {
            print("---------- RUN \(x)")
            let queue1 = DispatchQueue.global(qos: .userInitiated)
            let queue2 = DispatchQueue.global(qos: .background)

            let subj1 = PassthroughSubject<Int, Never>()
            let subj2 = PassthroughSubject<Int, Never>()

            let publ1 = subj1.receive(on: queue1).map { value -> Int in
                print("-- Observer 1: \(value), Thread: \(Thread.current)")
                return value
            }
            let publ2 = subj2.receive(on: queue2).map { value -> Int in
                print("-- Observer 2: \(value), Thread: \(Thread.current)")
                return value
            }

            let exp = expectation(description: "expect values")
            exp.assertForOverFulfill = false
            let canc = Publishers.CombineLatest(publ1, publ2)
                .sink { value1, value2 in
                    print("-- recieved \(value1):\(value2) on \(Thread.current)")
                    if value1 == 10, value2 == 20 {
                        exp.fulfill()
                    }
                }

            subj1.send(5)
            subj2.send(20)
            subj1.send(10)

            wait(for: [exp], timeout: 10)
            canc.cancel()
        }
    }

    func testCombineLatest_currentValue_receiveOn() {
        for x in 0...100 {
            print("---------- RUN \(x)")
            let queue1 = DispatchQueue.global(qos: .userInitiated)
            let queue2 = DispatchQueue.global(qos: .background)

            let subj1 = CurrentValueSubject<Int, Never>(0)
            let subj2 = CurrentValueSubject<Int, Never>(0)

            let publ1 = subj1.receive(on: queue1).map { value -> Int in
                print("-- Observer 1: \(value), Thread: \(Thread.current)")
                return value
            }
            let publ2 = subj2.receive(on: queue2).map { value -> Int in
                print("-- Observer 2: \(value), Thread: \(Thread.current)")
                return value
            }

            let exp = expectation(description: "expect values")
            exp.assertForOverFulfill = false
            let canc = Publishers.CombineLatest(publ1,
                                                publ2)
                .sink { value1, value2 in
                    print("-- recieved \(value1):\(value2) on \(Thread.current)")
                    if value1 == 10, value2 == 20 {
                        exp.fulfill()
                    }
                }

            subj1.send(10)
            subj2.send(20)

            wait(for: [exp], timeout: 3)
            canc.cancel()
        }
    }

    func testCombineLatest_subscribeOn() {
        for x in 0...100 {
            print("---------- RUN \(x)")
            let queue1 = DispatchQueue.global(qos: .userInitiated)
            let queue2 = DispatchQueue.global(qos: .background)

            let subj1 = PassthroughSubject<Int, Never>()
            let subj2 = PassthroughSubject<Int, Never>()

            let publ1 = subj1.map { value -> Int in
                print("-- Observer 1: \(value), Thread: \(Thread.current)")
                return value
            }
            let publ2 = subj2.map { value -> Int in
                print("-- Observer 2: \(value), Thread: \(Thread.current)")
                return value
            }

            let exp = expectation(description: "expect values")
            exp.assertForOverFulfill = false
            let canc = Publishers.CombineLatest(publ1, publ2)
                .sink { value1, value2 in
                    print("-- recieved \(value1):\(value2) on \(Thread.current)")
                    if value1 == 10, value2 == 20 {
                        exp.fulfill()
                    }
                }

            queue1.async {
                subj1.send(5)
                subj1.send(10)
            }

            queue2.async {
                subj2.send(20)
            }

            wait(for: [exp], timeout: 5)
            canc.cancel()
        }
    }

}

Вот протоколы 3-го теста

Test Case '-[xxxx.CombineLatestTests testCombineLatest_currentValue_receiveOn]' started.
---------- RUN 0
-- Observer 2: 0, Thread: <NSThread: 0x6000004e0f80>{number = 9, name = (null)}
-- Observer 1: 0, Thread: <NSThread: 0x6000004f0000>{number = 7, name = (null)}
-- Observer 2: 20, Thread: <NSThread: 0x6000004f6e00>{number = 6, name = (null)}
-- recieved 0:0 on <NSThread: 0x6000004f0000>{number = 7, name = (null)}
-- Observer 1: 10, Thread: <NSThread: 0x600000439880>{number = 4, name = (null)}
-- recieved 10:20 on <NSThread: 0x600000439880>{number = 4, name = (null)}
---------- RUN 1
-- Observer 2: 0, Thread: <NSThread: 0x6000004f0000>{number = 7, name = (null)}
-- Observer 1: 0, Thread: <NSThread: 0x6000004f6e00>{number = 6, name = (null)}
-- Observer 2: 20, Thread: <NSThread: 0x6000004e0f80>{number = 9, name = (null)}
-- recieved 0:0 on <NSThread: 0x6000004f6e00>{number = 6, name = (null)}
-- Observer 1: 10, Thread: <NSThread: 0x6000004e80c0>{number = 10, name = (null)}
-- recieved 10:20 on <NSThread: 0x6000004e80c0>{number = 10, name = (null)}
---------- RUN 2
-- Observer 2: 0, Thread: <NSThread: 0x6000004f6e00>{number = 6, name = (null)}
-- Observer 1: 0, Thread: <NSThread: 0x6000004e0f80>{number = 9, name = (null)}
-- Observer 2: 20, Thread: <NSThread: 0x6000004e80c0>{number = 10, name = (null)}
-- recieved 0:0 on <NSThread: 0x6000004e0f80>{number = 9, name = (null)}
-- Observer 1: 10, Thread: <NSThread: 0x600000439880>{number = 4, name = (null)}
-- recieved 10:20 on <NSThread: 0x600000439880>{number = 4, name = (null)}
---------- RUN 3
-- Observer 2: 0, Thread: <NSThread: 0x600000439880>{number = 4, name = (null)}
-- Observer 1: 0, Thread: <NSThread: 0x6000004e0f80>{number = 9, name = (null)}
-- Observer 2: 20, Thread: <NSThread: 0x6000004f6e00>{number = 6, name = (null)}
-- recieved 0:0 on <NSThread: 0x6000004e0f80>{number = 9, name = (null)}
-- Observer 1: 10, Thread: <NSThread: 0x6000004e80c0>{number = 10, name = (null)}
-- recieved 10:20 on <NSThread: 0x6000004e80c0>{number = 10, name = (null)}
---------- RUN 4
-- Observer 1: 0, Thread: <NSThread: 0x6000004f6e00>{number = 6, name = (null)}
-- Observer 2: 0, Thread: <NSThread: 0x6000004f0000>{number = 7, name = (null)}
-- recieved 0:0 on <NSThread: 0x6000004f0000>{number = 7, name = (null)}
-- Observer 1: 10, Thread: <NSThread: 0x600000439880>{number = 4, name = (null)}
-- Observer 2: 20, Thread: <NSThread: 0x6000004e80c0>{number = 10, name = (null)}
-- recieved 10:0 on <NSThread: 0x600000439880>{number = 4, name = (null)}
CombineLatestTests.swift:93: error: : Asynchronous wait failed: Exceeded timeout of 3 seconds, with unfulfilled expectations: "expect values".
Test Suite 'CombineLatestTests' failed at 2020-03-04 20:37:24.957.
     Executed 3 tests, with 3 failures (0 unexpected) in 18.159 (18.161) seconds

1 Ответ

0 голосов
/ 05 марта 2020

попробуйте этот тест

 func testCombineLatest_receiveOn() {
        for x in 0...100 {
            print("---------- RUN \(x)")
            let q1 = DispatchQueue(label: "q1", qos: .background)
            let q2 = DispatchQueue(label: "q2", qos: .utility, attributes: .concurrent)

            let subj1 = PassthroughSubject<Int, Never>()
            let subj2 = PassthroughSubject<Int, Never>()

            let publ1 = subj1
                .map { value -> Int in
                print("-- Observer 1: \(value), Thread: \(Thread.current)")
                return value
            }
            let publ2 = subj2
                .map { value -> Int in
                print("-- Observer 2: \(value), Thread: \(Thread.current)")
                return value
            }

            let exp = expectation(description: "expect values")
            exp.assertForOverFulfill = false
            // you have to use the same serial queue for publishers which you like to combine
            let canc = Publishers.CombineLatest(publ1.receive(on: q1), publ2.receive(on: q1))
                // this just redirect it to different queue which could be even concurrent
                // (it has no effect at all)
                .receive(on: q2)
                .sink { value1, value2 in
                    print("-- recieved \(value1): \(value2) on \(Thread.current)")
                    if value1 == 10, value2 == 20 {
                        exp.fulfill()
                    }
                }

            let q = DispatchQueue.global()
            // the values could be updated concurently
            q.async {
                subj1.send(5)
            }
            q.async {
                subj2.send(20)
            }
            q.async {
                subj1.send(10)
            }


            wait(for: [exp], timeout: 10)
            canc.cancel()
        }
    }

запустите его, проверьте распечатку и посмотрите примечания в приведенном выше фрагменте

часть распечатки

---------- RUN 6
-- Observer 1: 5, Thread: <NSThread: 0x6000020aec40>{number = 6, name = (null)}
-- Observer 1: 10, Thread: <NSThread: 0x6000020aec40>{number = 6, name = (null)}
-- Observer 2: 20, Thread: <NSThread: 0x6000020d1380>{number = 4, name = (null)}
-- recieved 10: 20 on <NSThread: 0x6000020aec40>{number = 6, name = (null)}
---------- RUN 7
-- Observer 1: 5, Thread: <NSThread: 0x6000020aec40>{number = 6, name = (null)}
-- Observer 2: 20, Thread: <NSThread: 0x6000020d1380>{number = 4, name = (null)}
-- Observer 1: 10, Thread: <NSThread: 0x6000020aec00>{number = 7, name = (null)}
-- recieved 5: 20 on <NSThread: 0x6000020aec00>{number = 7, name = (null)}
-- recieved 10: 20 on <NSThread: 0x6000020aec40>{number = 6, name = (null)}

, где вы может видеть эффект от одновременной отправки значений

...