Как я могу предотвратить, чтобы объект PassthroughSubject убил .sink до того, как закончится параллельный восходящий фьючерс? - PullRequest
3 голосов
/ 11 июля 2019

У меня есть объект PassthroughSubject, который отправляет 30 целых чисел с последующим сообщением о завершении.

Получив эти числа от субъекта, я порождаю будущее, которое спит одну секунду и завершается вводом номера * 2.

Я использую .receiveOn, чтобы убедиться, что фьючерс работаетодновременно, но это означает, что сообщение о завершении также одновременно распространяется по цепочке и завершает приемник до того, как все фьючерсы будут завершены.

Любые мастера RxSwift / Combine там знают, как я могу сделать так, чтобы прием сообщения о завершении задерживался завершением будущего?

Вот игровая площадка, которая реализует описанное поведение:

import Foundation
import Combine
import PlaygroundSupport

/// Setting up the playground
PlaygroundPage.current.needsIndefiniteExecution = true

/// Injects numbers 0-30 into combine message stream, and then sends a finish.
func publishNumbers(to subject: PassthroughSubject<Int, Error>) {
    (0..<30).forEach {
        subject.send($0)
    }
    subject.send(completion: .finished)
}
/// Delays for one secont, and completes the future by doubling the input.
func delayAndDoubleNumber(_ int: Int) -> Future<Int, Error> {
    return Future<Int, Error> { complete in
        sleep(1)
        complete(.success(int * 2))
    }
}

// Properties involved in Combine processing chain.
let numbersSubject = PassthroughSubject<Int, Error>()
let processingQueue = DispatchQueue.global(qos: .userInitiated)


// Combine processing chain
numbersSubject
    .receive(on: processingQueue) //Comment this line to observe that all futures finish, and are collected before the finish message kills the sink.
    .flatMap { number in
        return delayAndDoubleNumber(number)
    }
    .collect(4)
    .sink(receiveCompletion: { completion in
        print("Complete: \(completion)")
    }, receiveValue: { value in
        print("Received Value: \(value)")
    })

publishNumbers(to: numbersSubject)

Ответы [ 2 ]

3 голосов
/ 11 июля 2019

Начиная с Xcode 11 beta 3, вы не можете использовать параллельную очередь с Combine. Вы должны быть в состоянии Xcode 11 GM.

Филипп Хауслер - инженер Apple, работающий на Combine. Он сказал следующее на официальном форуме Swift :

Также стоит отметить, что DispatchQueue, используемый в качестве планировщика, всегда должен быть последовательным, чтобы придерживаться контрактов операторов комбината.

Тогда позже он сказал это:

Таким образом, для продолжения здесь есть некоторые изменения в отношении того, как распространяются события ниже по течению. Теперь мы можем удовлетворить ограничение 1,03, даже если DispatchQueue является одновременным или OperationQueue не является ограничением maxConcurrentOperations, равным 1, или в этом отношении любой действительный планировщик является одновременным; мы всегда будем отправлять сериализованные события в этом запрошенном планировщике для .receive(on:). Единственное оставшееся предостережение, которое мы немного отклоняемся от спецификации, заключается в том, что восходящие события, такие как cancel() и request(_:) в нашем мире, могут происходить одновременно. При этом мы обращаемся с ними безопасным образом.

Вы можете заставить ваш параллелизм работать в Xcode 11 beta 3, отправляя его в параллельную очередь, а затем обратно в основную очередь из закрытия Future:

import Foundation
import Combine
import PlaygroundSupport

PlaygroundPage.current.needsIndefiniteExecution = true

func delayAndDoubleNumber(_ int: Int) -> Future<Int, Never> {
    return Future<Int, Never> { complete in
        DispatchQueue.global(qos: .userInitiated).async {
            sleep(1)
            DispatchQueue.main.async {
                complete(.success(int * 2))
            }
        }
    }
}

let subject = PassthroughSubject<Int, Never>()

subject
    .flatMap { delayAndDoubleNumber($0) }
    .collect(4)
    .sink(
        receiveCompletion: { print("Complete: \($0)") },
        receiveValue: { print("Received Value: \($0)") })

let canceller = (0 ..< 30).publisher().subscribe(subject)
1 голос
/ 11 июля 2019

Отказ от ответственности, это может быть некорректной интерпретацией документов, но я думаю, что вы должны использовать оператор subscribe(on:) вместо receive(on:).

Apple Docs :

В отличие от получения (on: options :), которое влияет на последующие сообщения, подписка (on :) изменяет контекст выполнения исходящих сообщений.

Моя интерпретация этого, если вы хотите, чтобы события из вашего numbersSubject отправлялись в вашу очередь, вы должны использовать subscribe(on:), например:

numbersSubject
    .flatMap { number in
        return delayAndDoubleNumber(number)
    }
    .collect(4)
    .subscribe(on: processingQueue)
    .receive(on: RunLoop.main)
    .sink(receiveCompletion: { completion in
        print("Complete: \(completion)")
    }, receiveValue: { value in
        print("Received Value: \(value)")
    })
...