Предположим, у меня есть пользовательский подписчик, который запрашивает одно значение в подписке, а затем дополнительное значение через три секунды после получения предыдущего значения:
class MySubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never
private var subscription: Subscription?
func receive(subscription: Subscription) {
print("Subscribed")
self.subscription = subscription
subscription.request(.max(1))
}
func receive(_ input: Int) -> Subscribers.Demand {
print("Value: \(input)")
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(3)) {
self.subscription?.request(.max(1))
}
return .none
}
func receive(completion: Subscribers.Completion<Never>) {
print("Complete")
subscription = nil
}
}
Если я использую это для подписки на издателя с бесконечным диапазоном, обратное давление обрабатывается изящно: издатель ждет 3 секунды каждый раз, пока не получит следующее требование на отправку значения:
(1...).publisher.subscribe(MySubscriber())
// Prints values infinitely with ~3 seconds between each:
//
// Subscribed
// Value: 1
// Value: 2
// Value: 3
// ...
Но если я добавлю оператор map
, MySubscriber
даже не получит подписка; map
синхронно запрашивает Demand.Unlimited
после получения подписки, и приложение бесконечно вращается, когда map
пытается исчерпать бесконечный диапазон:
(1...).publisher
.map { value in
print("Map: \(value)")
return value * 2
}
.subscribe(MySubscriber())
// The `map` transform is executed infinitely with no delay:
//
// Map: 1
// Map: 2
// Map: 3
// ...
Мой вопрос: почему map
ведет себя сюда? Я бы ожидал, что map
просто передаст свои потребности в нисходящем потоке. Поскольку map
предполагается для преобразования, а не для побочных эффектов, я не понимаю, каков вариант использования для его текущего поведения.
EDIT
Я реализовал версию карты для покажите, как, по моему мнению, это должно работать:
extension Publishers {
struct MapLazily<Upstream: Publisher, Output>: Publisher {
typealias Failure = Upstream.Failure
let upstream: Upstream
let transform: (Upstream.Output) -> Output
init(upstream: Upstream, transform: @escaping (Upstream.Output) -> Output) {
self.upstream = upstream
self.transform = transform
}
public func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Upstream.Failure {
let mapSubscriber = Subscribers.LazyMapSubscriber(downstream: subscriber, transform: transform)
upstream.receive(subscriber: mapSubscriber)
}
}
}
extension Subscribers {
class LazyMapSubscriber<Input, DownstreamSubscriber: Subscriber>: Subscriber {
let downstream: DownstreamSubscriber
let transform: (Input) -> DownstreamSubscriber.Input
init(downstream: DownstreamSubscriber, transform: @escaping (Input) -> DownstreamSubscriber.Input) {
self.downstream = downstream
self.transform = transform
}
func receive(subscription: Subscription) {
downstream.receive(subscription: subscription)
}
func receive(_ input: Input) -> Subscribers.Demand {
downstream.receive(transform(input))
}
func receive(completion: Subscribers.Completion<DownstreamSubscriber.Failure>) {
downstream.receive(completion: completion)
}
}
}
extension Publisher {
func mapLazily<Transformed>(transform: @escaping (Output) -> Transformed) -> AnyPublisher<Transformed, Failure> {
Publishers.MapLazily(upstream: self, transform: transform).eraseToAnyPublisher()
}
}
Используя этот оператор, MySubscriber
немедленно получает подписку, а преобразование mapLazily
выполняется только при наличии запроса:
(1...).publisher
.mapLazily { value in
print("Map: \(value)")
return value * 2
}
.subscribe(MySubscriber())
// Only transforms the values when they are demanded by the downstream subscriber every 3 seconds:
//
// Subscribed
// Map: 1
// Value: 2
// Map: 2
// Value: 4
// Map: 3
// Value: 6
// Map: 4
// Value: 8
Я предполагаю, что конкретная перегрузка map
, определенная для Publishers.Sequence
, использует какой-то ярлык для повышения производительности. Это нарушается для бесконечных последовательностей, но даже для конечных последовательностей, нетерпеливо исчерпывающих последовательность, независимо от того, что требуется ниже по течению, портит мою интуицию. На мой взгляд, следующий код:
(1...3).publisher
.map { value in
print("Map: \(value)")
return value * 2
}
.subscribe(MySubscriber())
должен печатать:
Subscribed
Map: 1
Value: 2
Map: 2
Value: 4
Map: 3
Value: 6
Complete
, но вместо этого печатает:
Map: 1
Map: 2
Map: 3
Subscribed
Value: 2
Value: 4
Value: 6
Complete