Почему `Publishers.Map` с нетерпением потребляет вышестоящие значения? - PullRequest
2 голосов
/ 10 апреля 2020

Предположим, у меня есть пользовательский подписчик, который запрашивает одно значение в подписке, а затем дополнительное значение через три секунды после получения предыдущего значения:

class MySubscriber: Subscriber {
    typealias Input = Int
    typealias Failure = Never

    private var subscription: Subscription?

    func receive(subscription: Subscription) {
        print("Subscribed")

        self.subscription = subscription
        subscription.request(.max(1))
    }

    func receive(_ input: Int) -> Subscribers.Demand {
        print("Value: \(input)")

        DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(3)) {
            self.subscription?.request(.max(1))
        }

        return .none
    }

    func receive(completion: Subscribers.Completion<Never>) {
        print("Complete")
        subscription = nil
    }
}

Если я использую это для подписки на издателя с бесконечным диапазоном, обратное давление обрабатывается изящно: издатель ждет 3 секунды каждый раз, пока не получит следующее требование на отправку значения:

(1...).publisher.subscribe(MySubscriber())

// Prints values infinitely with ~3 seconds between each:
//
//     Subscribed
//     Value: 1
//     Value: 2
//     Value: 3
//     ...

Но если я добавлю оператор map, MySubscriber даже не получит подписка; map синхронно запрашивает Demand.Unlimited после получения подписки, и приложение бесконечно вращается, когда map пытается исчерпать бесконечный диапазон:

(1...).publisher
    .map { value in
        print("Map: \(value)")
        return value * 2
    }
    .subscribe(MySubscriber())

// The `map` transform is executed infinitely with no delay:
//
//     Map: 1
//     Map: 2
//     Map: 3
//     ...

Мой вопрос: почему map ведет себя сюда? Я бы ожидал, что map просто передаст свои потребности в нисходящем потоке. Поскольку map предполагается для преобразования, а не для побочных эффектов, я не понимаю, каков вариант использования для его текущего поведения.

EDIT

Я реализовал версию карты для покажите, как, по моему мнению, это должно работать:

extension Publishers {
    struct MapLazily<Upstream: Publisher, Output>: Publisher {
        typealias Failure = Upstream.Failure

        let upstream: Upstream
        let transform: (Upstream.Output) -> Output

        init(upstream: Upstream, transform: @escaping (Upstream.Output) -> Output) {
            self.upstream = upstream
            self.transform = transform
        }

        public func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Upstream.Failure {
            let mapSubscriber = Subscribers.LazyMapSubscriber(downstream: subscriber, transform: transform)
            upstream.receive(subscriber: mapSubscriber)
        }
    }
}

extension Subscribers {
    class LazyMapSubscriber<Input, DownstreamSubscriber: Subscriber>: Subscriber {
        let downstream: DownstreamSubscriber
        let transform: (Input) -> DownstreamSubscriber.Input

        init(downstream: DownstreamSubscriber, transform: @escaping (Input) -> DownstreamSubscriber.Input) {
            self.downstream = downstream
            self.transform = transform
        }

        func receive(subscription: Subscription) {
            downstream.receive(subscription: subscription)
        }

        func receive(_ input: Input) -> Subscribers.Demand {
            downstream.receive(transform(input))
        }

        func receive(completion: Subscribers.Completion<DownstreamSubscriber.Failure>) {
            downstream.receive(completion: completion)
        }
    }
}

extension Publisher {
    func mapLazily<Transformed>(transform: @escaping (Output) -> Transformed) -> AnyPublisher<Transformed, Failure> {
        Publishers.MapLazily(upstream: self, transform: transform).eraseToAnyPublisher()
    }
}

Используя этот оператор, MySubscriber немедленно получает подписку, а преобразование mapLazily выполняется только при наличии запроса:

(1...).publisher
    .mapLazily { value in
        print("Map: \(value)")
        return value * 2
    }
    .subscribe(MySubscriber())

// Only transforms the values when they are demanded by the downstream subscriber every 3 seconds:
//
//     Subscribed
//     Map: 1
//     Value: 2
//     Map: 2
//     Value: 4
//     Map: 3
//     Value: 6
//     Map: 4
//     Value: 8

Я предполагаю, что конкретная перегрузка map, определенная для Publishers.Sequence, использует какой-то ярлык для повышения производительности. Это нарушается для бесконечных последовательностей, но даже для конечных последовательностей, нетерпеливо исчерпывающих последовательность, независимо от того, что требуется ниже по течению, портит мою интуицию. На мой взгляд, следующий код:

(1...3).publisher
    .map { value in
        print("Map: \(value)")
        return value * 2
    }
    .subscribe(MySubscriber())

должен печатать:

Subscribed
Map: 1
Value: 2
Map: 2
Value: 4
Map: 3
Value: 6
Complete

, но вместо этого печатает:

Map: 1
Map: 2
Map: 3
Subscribed
Value: 2
Value: 4
Value: 6
Complete

1 Ответ

2 голосов
/ 11 апреля 2020

Вот более простой тест, который не включает каких-либо пользовательских подписчиков:

(1...).publisher
    //.map { $0 }
    .flatMap(maxPublishers: .max(1)) {
        (i:Int) -> AnyPublisher<Int,Never> in
        Just<Int>(i)
            .delay(for: 3, scheduler: DispatchQueue.main)
            .eraseToAnyPublisher()
}
.sink { print($0) }
.store(in: &storage)

Он работает, как и ожидалось, но тогда, если вы раскомментируете .map, вы ничего не получите, потому что оператор .map Накапливая бесконечные значения в восходящем потоке, ничего не публикуя.

Исходя из вашей гипотезы о том, что map каким-то образом оптимизирует работу для предыдущего издателя последовательности, я попробовал этот обходной путь:

(1...).publisher.eraseToAnyPublisher()
    .map { $0 }
    // ...

И конечно достаточно, это решило проблему! Скрывая издатель последовательности от оператора карты, мы предотвращаем оптимизацию.

...